# Analyzing Distributed Data with Spark

## Initializing Spark Application

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
from pyspark.sql.functions import to_timestamp, to_date, year

In [2]:
# Spark session & context
spark = SparkSession \
    .builder \
    .master('spark://spark-master:7077') \
    .appName("uebung_26") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
sc = spark.sparkContext

In [6]:
# Load a text file and inspect it
def read_csv_file(path, sep=',', header=True):
    df = spark.read.csv(
        path=path, 
        sep=sep,
        header=header,
        inferSchema=True
    )
    df.printSchema()
    df.show(10)
    return df

## Reading in all relevant files

In [9]:
holidays = read_csv_file('hdfs://namenode:8020/user/root/workspace/pyspark/holidays_events.csv')

root
 |-- datum_holi: string (nullable = true)
 |-- type: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- locale_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- transferred: boolean (nullable = true)

+----------+-------+--------+-----------+--------------------+-----------+
|datum_holi|   type|  locale|locale_name|         description|transferred|
+----------+-------+--------+-----------+--------------------+-----------+
|2012-03-02|Holiday|   Local|      Manta|  Fundacion de Manta|      false|
|2012-04-01|Holiday|Regional|   Cotopaxi|Provincializacion...|      false|
|2012-04-12|Holiday|   Local|     Cuenca| Fundacion de Cuenca|      false|
|2012-04-14|Holiday|   Local|   Libertad|Cantonizacion de ...|      false|
|2012-04-21|Holiday|   Local|   Riobamba|Cantonizacion de ...|      false|
|2012-05-12|Holiday|   Local|       Puyo|Cantonizacion del...|      false|
|2012-06-23|Holiday|   Local|   Guaranda|Cantonizacion de ...|      false

In [8]:
items = read_csv_file('hdfs://namenode:8020/user/root/workspace/pyspark/items.csv')

root
 |-- item_nbr_item: integer (nullable = true)
 |-- family: string (nullable = true)
 |-- class: integer (nullable = true)
 |-- perishable: integer (nullable = true)

+-------------+------------+-----+----------+
|item_nbr_item|      family|class|perishable|
+-------------+------------+-----+----------+
|        96995|   GROCERY I| 1093|         0|
|        99197|   GROCERY I| 1067|         0|
|       103501|    CLEANING| 3008|         0|
|       103520|   GROCERY I| 1028|         0|
|       103665|BREAD/BAKERY| 2712|         1|
|       105574|   GROCERY I| 1045|         0|
|       105575|   GROCERY I| 1045|         0|
|       105576|   GROCERY I| 1045|         0|
|       105577|   GROCERY I| 1045|         0|
|       105693|   GROCERY I| 1034|         0|
+-------------+------------+-----+----------+
only showing top 10 rows



In [10]:
stores = read_csv_file('hdfs://namenode:8020/user/root/workspace/pyspark/quito_stores_sample2016-2017.csv')

root
 |-- id: integer (nullable = true)
 |-- date_quito: string (nullable = true)
 |-- store_nbr_quito: integer (nullable = true)
 |-- item_nbr_quito: integer (nullable = true)
 |-- unit_sales: double (nullable = true)
 |-- onpromotion: boolean (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cluster: integer (nullable = true)

+--------+----------+---------------+--------------+----------+-----------+-----+---------+-------+
|      id|date_quito|store_nbr_quito|item_nbr_quito|unit_sales|onpromotion| city|    state|cluster|
+--------+----------+---------------+--------------+----------+-----------+-----+---------+-------+
|88211471|2016-08-16|             44|        103520|       7.0|       true|Quito|Pichincha|      5|
|88211472|2016-08-16|             44|        103665|       7.0|      false|Quito|Pichincha|      5|
|88211473|2016-08-16|             44|        105574|      13.0|      false|Quito|Pichincha|      5|
|88211474|2016-08-16|  

In [13]:
transactions = read_csv_file('hdfs://namenode:8020/user/root/workspace/pyspark/transactions.csv')

root
 |-- date_trans: string (nullable = true)
 |-- store_nbr_trans: integer (nullable = true)
 |-- transactions: integer (nullable = true)

+----------+---------------+------------+
|date_trans|store_nbr_trans|transactions|
+----------+---------------+------------+
|2013-01-01|             25|         770|
|2013-01-02|              1|        2111|
|2013-01-02|              2|        2358|
|2013-01-02|              3|        3487|
|2013-01-02|              4|        1922|
|2013-01-02|              5|        1903|
|2013-01-02|              6|        2143|
|2013-01-02|              7|        1874|
|2013-01-02|              8|        3250|
|2013-01-02|              9|        2940|
+----------+---------------+------------+
only showing top 10 rows



## Exercise 1
Sum of `unit_sales` of the year 2017 grouped by `item_nbr_quito`

In [29]:
# Extract the year from date_quito
unit_sales = stores.withColumn('date_quito', to_date('date_quito', 'yyyy-MM-dd'))
unit_sales = unit_sales.withColumn('year', year('date_quito'))
print(unit_sales.show(5))
print(unit_sales.printSchema())

+--------+----------+---------------+--------------+----------+-----------+-----+---------+-------+----+
|      id|date_quito|store_nbr_quito|item_nbr_quito|unit_sales|onpromotion| city|    state|cluster|year|
+--------+----------+---------------+--------------+----------+-----------+-----+---------+-------+----+
|88211471|2016-08-16|             44|        103520|       7.0|       true|Quito|Pichincha|      5|2016|
|88211472|2016-08-16|             44|        103665|       7.0|      false|Quito|Pichincha|      5|2016|
|88211473|2016-08-16|             44|        105574|      13.0|      false|Quito|Pichincha|      5|2016|
|88211474|2016-08-16|             44|        105575|      18.0|      false|Quito|Pichincha|      5|2016|
|88211475|2016-08-16|             44|        105577|       8.0|      false|Quito|Pichincha|      5|2016|
+--------+----------+---------------+--------------+----------+-----------+-----+---------+-------+----+
only showing top 5 rows

None
root
 |-- id: integer (nu

In [36]:
# Get a subset of year 2017, group it by item_nbr_quito and calculate the sum
unit_sales_year = unit_sales.filter(unit_sales['year'] == 2017)
unit_sales_year.show(10)

+---+----------+---------------+--------------+----------+-----------+----+-----+-------+----+
| id|date_quito|store_nbr_quito|item_nbr_quito|unit_sales|onpromotion|city|state|cluster|year|
+---+----------+---------------+--------------+----------+-----------+----+-----+-------+----+
+---+----------+---------------+--------------+----------+-----------+----+-----+-------+----+



Since there are no entries for the year 2017, I will use the year 2016 instead.

In [39]:
unit_sales_year = unit_sales.filter(unit_sales['year'] == 2016)
unit_sales_year = unit_sales_year.groupBy('item_nbr_quito').sum('unit_sales')
unit_sales_year.show(10)

+--------------+---------------+
|item_nbr_quito|sum(unit_sales)|
+--------------+---------------+
|        454593|         1147.0|
|        459762|         3964.0|
|        692531|         2729.0|
|        699703|      17312.817|
|        759651|         2446.0|
|        867850|         1209.0|
|       1047786|        11074.0|
|       1118691|         2159.0|
|       1230417|         2220.0|
|       1471462|         1600.0|
+--------------+---------------+
only showing top 10 rows



## Exercise 2
Calculate the number of items per family

In [49]:
family = items.groupBy('family').count().orderBy('count', ascending=False)
family.show(10)

+-------------+-----+
|       family|count|
+-------------+-----+
|    GROCERY I| 1334|
|    BEVERAGES|  613|
|     CLEANING|  446|
|      PRODUCE|  306|
|        DAIRY|  242|
|PERSONAL CARE|  153|
| BREAD/BAKERY|  134|
|    HOME CARE|  108|
|         DELI|   91|
|        MEATS|   84|
+-------------+-----+
only showing top 10 rows



In [None]:
# NEVER FORGET to stop the session
spark.stop()

In [18]:
help(to_date(stores['date_quito'], 'yyyy-MM-dd').alias)

Help on method alias in module pyspark.sql.column:

alias(*alias, **kwargs) method of pyspark.sql.column.Column instance
    Returns this column aliased with a new name or names (in the case of expressions that
    return more than one column, such as explode).
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    alias : str
        desired column names (collects all positional arguments passed)
    
    Other Parameters
    ----------------
    metadata: dict
        a dict of information to be stored in ``metadata`` attribute of the
        corresponding :class:`StructField <pyspark.sql.types.StructField>` (optional, keyword
        only argument)
    
        .. versionchanged:: 2.2.0
           Added optional ``metadata`` argument.
    
    Examples
    --------
    >>> df.select(df.age.alias("age2")).collect()
    [Row(age2=2), Row(age2=5)]
    >>> df.select(df.age.alias("age3", metadata={'max': 99})).schema['age3'].metadata['max']
    99

