In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null 

In [3]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

In [4]:
!tar xf spark-3.2.1-bin-hadoop3.2.tgz

In [5]:
import os
os.environ["JAVA HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [6]:
!pip install -q findspark

In [7]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [8]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import matplotlib
import seaborn as sns
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
sns.set(style="white")


import plotly
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.figure_factory as ff

from plotly.offline import plot, iplot, init_notebook_mode
init_notebook_mode(connected=True)

In [20]:
!ls data/Stores.csv

data/Stores.csv


In [21]:
df = spark.read.csv("data/Stores.csv")

In [22]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)



In [23]:
df.show(15,False)

+---------+----------+---------------+--------------------+-----------+
|_c0      |_c1       |_c2            |_c3                 |_c4        |
+---------+----------+---------------+--------------------+-----------+
|Store ID |Store_Area|Items_Available|Daily_Customer_Count|Store_Sales|
|1        |1659      |1961           |530                 |66490      |
|2        |1461      |1752           |210                 |39820      |
|3        |1340      |1609           |720                 |54010      |
|4        |1451      |1748           |620                 |53730      |
|5        |1770      |2111           |450                 |46620      |
|6        |1442      |1733           |760                 |45260      |
|7        |1542      |1858           |1030                |72240      |
|8        |1261      |1507           |1020                |37720      |
|9        |1090      |1321           |680                 |46310      |
|10       |1030      |1235           |1130                |44150

In [14]:
df.count()

171

In [24]:
df.select("_c1", "_c2", "_c3", "_c4").show(15)

+----------+---------------+--------------------+-----------+
|       _c1|            _c2|                 _c3|        _c4|
+----------+---------------+--------------------+-----------+
|Store_Area|Items_Available|Daily_Customer_Count|Store_Sales|
|      1659|           1961|                 530|      66490|
|      1461|           1752|                 210|      39820|
|      1340|           1609|                 720|      54010|
|      1451|           1748|                 620|      53730|
|      1770|           2111|                 450|      46620|
|      1442|           1733|                 760|      45260|
|      1542|           1858|                1030|      72240|
|      1261|           1507|                1020|      37720|
|      1090|           1321|                 680|      46310|
|      1030|           1235|                1130|      44150|
|      1187|           1439|                1090|      71280|
|      1751|           2098|                 720|      57620|
|      1

In [25]:
df_filtered = df.filter("_c2 > 171 AND _c4 > 7 AND _c1 IS NOT NULL")
df_filtered.select("_c1", "_c3", "_c4", "_c2").show(100, False)

+----+----+-----+----+
|_c1 |_c3 |_c4  |_c2 |
+----+----+-----+----+
|1659|530 |66490|1961|
|1461|210 |39820|1752|
|1340|720 |54010|1609|
|1451|620 |53730|1748|
|1770|450 |46620|2111|
|1442|760 |45260|1733|
|1542|1030|72240|1858|
|1261|1020|37720|1507|
|1090|680 |46310|1321|
|1030|1130|44150|1235|
|1187|1090|71280|1439|
|1751|720 |57620|2098|
|1746|1050|60470|2064|
|1615|1160|59130|1931|
|1469|770 |66360|1756|
|1644|790 |78870|1950|
|1578|1440|77250|1907|
|1703|670 |38170|2045|
|1438|1030|63540|1731|
|1940|980 |40190|2340|
|1421|370 |43460|1700|
|1458|690 |68890|1746|
|1719|950 |52780|2065|
|1449|620 |50680|1752|
|1234|840 |41880|1488|
|1732|820 |70050|2073|
|1475|1100|25820|1777|
|1390|980 |60530|1648|
|1642|710 |78100|1943|
|1715|650 |84860|2071|
|1439|990 |80140|1746|
|1250|990 |14920|1508|
|1331|880 |60460|1608|
|1784|620 |74560|2163|
|1375|1020|72430|1648|
|1871|700 |45460|2230|
|1442|610 |41570|1744|
|1174|1080|62870|1411|
|1839|1010|55170|2204|
|1270|10  |45480|1516|
|1435|1250|

In [28]:
from pyspark.sql import functions as f
maxValue = df_filtered.agg(f.max("_c4")).collect()[0][0]
print("maxValue: ",maxValue)
df_filtered.select("_c1","_c3", "_c4", "_c2").filter(df._c4 == maxValue).show(100, False)

maxValue:  99570
+----+---+-----+----+
|_c1 |_c3|_c4  |_c2 |
+----+---+-----+----+
|1465|680|99570|1763|
+----+---+-----+----+



In [30]:
df.filter("_c4="+str(df.agg(f.min("_c4")).collect()[0][0])).show()

+---+----+----+---+------+
|_c0| _c1| _c2|_c3|   _c4|
+---+----+----+---+------+
|867|1565|1854|900|100900|
+---+----+----+---+------+



In [37]:
df.groupBy('_c3').count().sort(df._c3).show()

+----+-----+
| _c3|count|
+----+-----+
|  10|    1|
| 100|    1|
|1000|    8|
|1010|    6|
|1020|   13|
|1030|    7|
|1040|    6|
|1050|    7|
|1060|    7|
|1070|   13|
|1080|   11|
|1090|    6|
|1100|    4|
|1110|    6|
|1120|    5|
|1130|    4|
|1140|    8|
|1150|    3|
|1160|    6|
|1170|    7|
+----+-----+
only showing top 20 rows

