In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark import SQLContext

In [2]:
sc = SparkSession.builder \
    .master("local") \
    .appName("Wholesale+customers") \
    .getOrCreate()

In [3]:
sqlContext = SQLContext(sparkContext=sc.sparkContext, sparkSession=sc)

#### Read the csv file as a dataframe. And, not as RDD. See the schema of the DF.


In [22]:
train = sqlContext.read.csv('Wholesale customers data.csv',header = True,inferSchema = True)

#### schema of the DF.

In [23]:
train.printSchema()

root
 |-- Channel: integer (nullable = true)
 |-- Region: integer (nullable = true)
 |-- Fresh: integer (nullable = true)
 |-- Milk: integer (nullable = true)
 |-- Grocery: integer (nullable = true)
 |-- Frozen: integer (nullable = true)
 |-- Detergents_Paper: integer (nullable = true)
 |-- Delicassen: integer (nullable = true)



In [24]:
train.show(10,truncate= True)

+-------+------+-----+-----+-------+------+----------------+----------+
|Channel|Region|Fresh| Milk|Grocery|Frozen|Detergents_Paper|Delicassen|
+-------+------+-----+-----+-------+------+----------------+----------+
|      2|     3|12669| 9656|   7561|   214|            2674|      1338|
|      2|     3| 7057| 9810|   9568|  1762|            3293|      1776|
|      2|     3| 6353| 8808|   7684|  2405|            3516|      7844|
|      1|     3|13265| 1196|   4221|  6404|             507|      1788|
|      2|     3|22615| 5410|   7198|  3915|            1777|      5185|
|      2|     3| 9413| 8259|   5126|   666|            1795|      1451|
|      2|     3|12126| 3199|   6975|   480|            3140|       545|
|      2|     3| 7579| 4956|   9426|  1669|            3321|      2566|
|      1|     3| 5963| 3648|   6192|   425|            1716|       750|
|      2|     3| 6006|11093|  18881|  1159|            7425|      2098|
+-------+------+-----+-----+-------+------+----------------+----

### Count the number of rows in the dataframe

In [25]:
train.count()

440

### Use describe to see summary statistics on dataframe.

In [9]:
train.describe().show()


+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|summary|           Channel|            Region|             Fresh|              Milk|          Grocery|           Frozen|  Detergents_Paper|        Delicassen|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+
|  count|               440|               440|               440|               440|              440|              440|               440|               440|
|   mean|1.3227272727272728| 2.543181818181818|12000.297727272728| 5796.265909090909|7951.277272727273|3071.931818181818|2881.4931818181817|1524.8704545454545|
| stddev|0.4680515694791137|0.7742724492301002|12647.328865076885|7380.3771745708445|9503.162828994346|4854.673332592367| 4767.854447904201|2820.1059373693965|
|    min|                 1|            

#### Finding out number of NULL values in each column

In [26]:
names = train.schema.names
for name in names:
    print(name + ": " + str(train.where(train[name].isNull()).count()))

Channel: 0
Region: 0
Fresh: 0
Milk: 0
Grocery: 0
Frozen: 0
Detergents_Paper: 0
Delicassen: 0


#### Use select to view a single column or a set of chosen columns.

In [27]:
train.select('Grocery','Frozen').show(5)

+-------+------+
|Grocery|Frozen|
+-------+------+
|   7561|   214|
|   9568|  1762|
|   7684|  2405|
|   4221|  6404|
|   7198|  3915|
+-------+------+
only showing top 5 rows



#### Use filter to see records with fresh sales more than 50000 only.

In [12]:
train.filter(train.Fresh > 50000).show()

+-------+------+------+-----+-------+------+----------------+----------+
|Channel|Region| Fresh| Milk|Grocery|Frozen|Detergents_Paper|Delicassen|
+-------+------+------+-----+-------+------+----------------+----------+
|      1|     3| 56159|  555|    902| 10002|             212|      2916|
|      1|     3| 56082| 3504|   8906| 18028|            1480|      2498|
|      1|     3| 76237| 3473|   7102| 16538|             778|       918|
|      1|     3|112151|29627|  18148| 16745|            4948|      8550|
|      1|     1| 56083| 4563|   2124|  6422|             730|      3321|
|      1|     1| 53205| 4959|   7336|  3012|             967|       818|
|      1|     3| 68951| 4411|  12609|  8692|             751|      2406|
+-------+------+------+-----+-------+------+----------------+----------+



#### Create aggregates on channels and regions variables.

In [28]:
train.groupby('Channel').agg({'Fresh': 'mean'}).show()

+-------+------------------+
|Channel|        avg(Fresh)|
+-------+------------------+
|      1|13475.560402684563|
|      2| 8904.323943661971|
+-------+------------------+



In [29]:
train.groupby('Channel').agg({'Milk': 'mean'}).show()

+-------+------------------+
|Channel|         avg(Milk)|
+-------+------------------+
|      1|3451.7248322147652|
|      2|           10716.5|
+-------+------------------+



In [30]:
train.groupby('Channel','Region').agg({'Fresh': 'mean'}).show()

+-------+------+------------------+
|Channel|Region|        avg(Fresh)|
+-------+------+------------------+
|      2|     2| 7289.789473684211|
|      2|     3| 9831.504761904762|
|      1|     2|11650.535714285714|
|      1|     1|12902.254237288136|
|      1|     3|13878.052132701421|
|      2|     1|            5200.0|
+-------+------+------------------+



In [31]:
from pyspark.sql.functions import desc
train.groupby('Channel','Region').agg({'Fresh': 'mean'}).sort("Channel","Region",ascending=True).show()

+-------+------+------------------+
|Channel|Region|        avg(Fresh)|
+-------+------+------------------+
|      1|     1|12902.254237288136|
|      1|     2|11650.535714285714|
|      1|     3|13878.052132701421|
|      2|     1|            5200.0|
|      2|     2| 7289.789473684211|
|      2|     3| 9831.504761904762|
+-------+------+------------------+



In [32]:
import pyspark.sql.functions as F
cols = ['Fresh','Milk','Grocery','Frozen']
exprs = [F.mean(F.col(x)) for x in cols] 
train.groupby('Channel','Region').agg(*exprs).sort("Channel","Region",ascending=True).show()

+-------+------+------------------+------------------+------------------+------------------+
|Channel|Region|        avg(Fresh)|         avg(Milk)|      avg(Grocery)|       avg(Frozen)|
+-------+------+------------------+------------------+------------------+------------------+
|      1|     1|12902.254237288136|3870.2033898305085| 4026.135593220339| 3127.322033898305|
|      1|     2|11650.535714285714|           2304.25|            4395.5| 5745.035714285715|
|      1|     3|13878.052132701421|3486.9810426540284| 3886.734597156398| 3656.900473933649|
|      2|     1|            5200.0|           10784.0|18471.944444444445|2584.1111111111113|
|      2|     2| 7289.789473684211|  9190.78947368421|16326.315789473685| 1540.578947368421|
|      2|     3| 9831.504761904762|10981.009523809524|15953.809523809523|            1513.2|
+-------+------+------------------+------------------+------------------+------------------+



In [33]:
exprs = [F.count(F.col(x)) for x in cols]
train.groupby('Channel','Region').agg(*exprs).sort("Channel","Region",ascending=True).show()

+-------+------+------------+-----------+--------------+-------------+
|Channel|Region|count(Fresh)|count(Milk)|count(Grocery)|count(Frozen)|
+-------+------+------------+-----------+--------------+-------------+
|      1|     1|          59|         59|            59|           59|
|      1|     2|          28|         28|            28|           28|
|      1|     3|         211|        211|           211|          211|
|      2|     1|          18|         18|            18|           18|
|      2|     2|          19|         19|            19|           19|
|      2|     3|         105|        105|           105|          105|
+-------+------+------------+-----------+--------------+-------------+



#### Use describe to see summary statistics on dataframe.

In [19]:
from pyspark.sql.functions import col , column
changedTypedf = train.withColumn("Channel", col("Channel").cast("string"))

#### Change datatype of Channels to Strings.

In [20]:
changedTypedf.printSchema()

root
 |-- Channel: string (nullable = true)
 |-- Region: integer (nullable = true)
 |-- Fresh: integer (nullable = true)
 |-- Milk: integer (nullable = true)
 |-- Grocery: integer (nullable = true)
 |-- Frozen: integer (nullable = true)
 |-- Detergents_Paper: integer (nullable = true)
 |-- Delicassen: integer (nullable = true)



In [21]:
changedTypedf.show(10,truncate= True)

+-------+------+-----+-----+-------+------+----------------+----------+
|Channel|Region|Fresh| Milk|Grocery|Frozen|Detergents_Paper|Delicassen|
+-------+------+-----+-----+-------+------+----------------+----------+
|      2|     3|12669| 9656|   7561|   214|            2674|      1338|
|      2|     3| 7057| 9810|   9568|  1762|            3293|      1776|
|      2|     3| 6353| 8808|   7684|  2405|            3516|      7844|
|      1|     3|13265| 1196|   4221|  6404|             507|      1788|
|      2|     3|22615| 5410|   7198|  3915|            1777|      5185|
|      2|     3| 9413| 8259|   5126|   666|            1795|      1451|
|      2|     3|12126| 3199|   6975|   480|            3140|       545|
|      2|     3| 7579| 4956|   9426|  1669|            3321|      2566|
|      1|     3| 5963| 3648|   6192|   425|            1716|       750|
|      2|     3| 6006|11093|  18881|  1159|            7425|      2098|
+-------+------+-----+-----+-------+------+----------------+----