In [1]:
import os
os.environ['SPARK_HOME'] = r'C:\Users\Marcos\Documents\Spark'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [4]:
spark = SparkSession.builder.appName('DataFrameSQL').getOrCreate()

In [5]:
path_file = 'data/walmart-sales-dataset-of-45stores.csv'
df = spark.read.csv(path_file, header=True, inferSchema=True)

In [6]:
df.printSchema()
df.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Holiday_Flag: integer (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)

+-----+----------+------------+------------+-----------+----------+-----------+------------+
|Store|      Date|Weekly_Sales|Holiday_Flag|Temperature|Fuel_Price|        CPI|Unemployment|
+-----+----------+------------+------------+-----------+----------+-----------+------------+
|    1|05-02-2010|   1643690.9|           0|      42.31|     2.572|211.0963582|       8.106|
|    1|12-02-2010|  1641957.44|           1|      38.51|     2.548|211.2421698|       8.106|
|    1|19-02-2010|  1611968.17|           0|      39.93|     2.514|211.2891429|       8.106|
|    1|26-02-2010|  1409727.59|           0|      46.63|     2.561|211.3196429|       8.106|
|    1|05-03-201

In [8]:
df.createOrReplaceTempView('myTable')

In [16]:
result = spark.sql("SELECT * FROM myTable WHERE Date = '12-02-2010' and CPI > 212")
result.show()

+-----+----------+------------+------------+-----------+----------+-----------+------------+
|Store|      Date|Weekly_Sales|Holiday_Flag|Temperature|Fuel_Price|        CPI|Unemployment|
+-----+----------+------------+------------+-----------+----------+-----------+------------+
|    3|12-02-2010|   420728.96|           1|      47.93|     2.548|214.5747916|       7.368|
|    6|12-02-2010|  1606283.86|           1|      40.57|     2.548|212.7700425|       7.259|
|    8|12-02-2010|    994801.4|           1|      33.34|     2.548|214.6214189|       6.299|
|    9|12-02-2010|   552677.48|           1|      37.08|     2.548|214.8056534|       6.415|
|   11|12-02-2010|  1574684.08|           1|      48.01|     2.548|214.5747916|       7.368|
+-----+----------+------------+------------+-----------+----------+-----------+------------+



In [23]:
result = spark.sql("SELECT Store, AVG(Weekly_Sales) AS Average_Sales FROM myTable GROUP BY Store ORDER BY Average_Sales DESC")
result.show(10)

+-----+------------------+
|Store|     Average_Sales|
+-----+------------------+
|   20|2107676.8703496507|
|    4|2094712.9606993007|
|   14| 2020978.400979021|
|   13| 2003620.306293707|
|    2|1925751.3355244761|
|   10| 1899424.572657342|
|   27| 1775216.201958042|
|    6|1564728.1862937063|
|    1|1555264.3975524479|
|   39| 1450668.129160839|
+-----+------------------+
only showing top 10 rows



In [24]:
view_exists = spark.catalog.tableExists('myTable')
view_exists

True

In [25]:
spark.catalog.dropTempView('myTable')

True

In [27]:
path_file = 'data/walmart-sales-dataset-of-45stores.csv'
df = spark.read.csv(path_file, header=True, inferSchema=True)

In [37]:
df1 = df.drop('Date', 'Temperature', 'Fuel_Price', 'CPI')
df2 = df.drop('Holiday_Flag', 'Unemployment', 'Weekly_Sales')

In [39]:
df1.createOrReplaceTempView('sales')
df2.createOrReplaceTempView('schedule')

In [41]:
df1.show(1)
df2.show(1)

+-----+------------+------------+------------+
|Store|Weekly_Sales|Holiday_Flag|Unemployment|
+-----+------------+------------+------------+
|    1|   1643690.9|           0|       8.106|
+-----+------------+------------+------------+
only showing top 1 row

+-----+----------+-----------+----------+-----------+
|Store|      Date|Temperature|Fuel_Price|        CPI|
+-----+----------+-----------+----------+-----------+
|    1|05-02-2010|      42.31|     2.572|211.0963582|
+-----+----------+-----------+----------+-----------+
only showing top 1 row



In [44]:
result = spark.sql("""
                   SELECT Weekly_Sales FROM sales
                   WHERE Store IN (
                   SELECT Temperature FROM schedule
                   WHERE Temperature > 43
                   )
                   """)
result.show(5)

+------------+
|Weekly_Sales|
+------------+
|   890689.51|
|   656988.64|
|   841264.04|
|   741891.65|
|   777951.22|
+------------+
only showing top 5 rows



In [50]:
result = spark.sql("""
                   SELECT sales.*, schedule.Fuel_Price FROM sales
                   JOIN schedule ON sales.Store = schedule.Store
                   """)
result.show(5)

+-----+------------+------------+------------+----------+
|Store|Weekly_Sales|Holiday_Flag|Unemployment|Fuel_Price|
+-----+------------+------------+------------+----------+
|    1|   1643690.9|           0|       8.106|     3.506|
|    1|   1643690.9|           0|       8.106|     3.594|
|    1|   1643690.9|           0|       8.106|     3.601|
|    1|   1643690.9|           0|       8.106|     3.617|
|    1|   1643690.9|           0|       8.106|     3.666|
+-----+------------+------------+------------+----------+
only showing top 5 rows



In [51]:
window_spec = Window.partitionBy('Weekly_Sales').orderBy(F.desc('Fuel_Price'))

In [53]:
result.withColumn('rank', F.rank().over(window_spec)).show(5)

+-----+------------+------------+------------+----------+----+
|Store|Weekly_Sales|Holiday_Flag|Unemployment|Fuel_Price|rank|
+-----+------------+------------+------------+----------+----+
|   33|   209986.25|           0|       9.265|     4.468|   1|
|   33|   209986.25|           0|       9.265|     4.449|   2|
|   33|   209986.25|           0|       9.265|     4.308|   3|
|   33|   209986.25|           0|       9.265|     4.301|   4|
|   33|   209986.25|           0|       9.265|     4.294|   5|
+-----+------------+------------+------------+----------+----+
only showing top 5 rows



In [None]:
spark.stop()