## Adapted from https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/ to work with Spark 3

In [1]:
# These should agree
%env PYSPARK_DRIVER_PYTHON /opt/anaconda3/bin/python3
%env PYSPARK_PYTHON /opt/anaconda3/bin/python3

env: PYSPARK_DRIVER_PYTHON=/opt/anaconda3/bin/python3
env: PYSPARK_PYTHON=/opt/anaconda3/bin/python3


In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [4]:
from pyspark import SparkContext, SQLContext
sc = SparkContext.getOrCreate()

In [5]:
sql_context = SQLContext(sc)

In [6]:
from pyspark.sql import Row

l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [7]:
schema_people = sql_context.createDataFrame(people)

In [8]:
type(schema_people)

pyspark.sql.dataframe.DataFrame

SQLContext.load() is deprecated, so using spark.read() below.

You can get train.csv and test.csv as a free download on https://datahack.analyticsvidhya.com/contest/black-friday/ if you agree not to redistribute.

In [9]:
train = spark.read \
    .options(header=True, inferSchema=True) \
    .csv("../../black_friday_data/train.csv")

In [10]:
test = spark.read \
    .options(header=True, inferSchema=True) \
    .csv("../../black_friday_data/test.csv")

In [11]:
train.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [12]:
train.head(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

In [13]:
train.show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [14]:
train.show(2, truncate=True)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [15]:
train.count(), test.count()

(550068, 233599)

In [16]:
len(train.columns), train.columns

(12,
 ['User_ID',
  'Product_ID',
  'Gender',
  'Age',
  'Occupation',
  'City_Category',
  'Stay_In_Current_City_Years',
  'Marital_Status',
  'Product_Category_1',
  'Product_Category_2',
  'Product_Category_3',
  'Purchase'])

In [17]:
len(test.columns), test.columns

(11,
 ['User_ID',
  'Product_ID',
  'Gender',
  'Age',
  'Occupation',
  'City_Category',
  'Stay_In_Current_City_Years',
  'Marital_Status',
  'Product_Category_1',
  'Product_Category_2',
  'Product_Category_3'])

In [18]:
train.describe().show()

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [19]:
train.describe('Product_ID').show()

+-------+----------+
|summary|Product_ID|
+-------+----------+
|  count|    550068|
|   mean|      null|
| stddev|      null|
|    min| P00000142|
|    max|  P0099942|
+-------+----------+



In [20]:
train.select('User_ID', 'Age').show(5)

+-------+----+
|User_ID| Age|
+-------+----+
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000002| 55+|
+-------+----+
only showing top 5 rows



In [21]:
train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()

(3631, 3491)

In [22]:
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))

In [23]:
diff_cat_in_train_test.distinct().count()

46

In [24]:
diff_cat_in_test_train=train.select('Product_ID').subtract(test.select('Product_ID'))

In [25]:
diff_cat_in_test_train.distinct().count()

186

In [26]:
train.crosstab('Age', 'Gender').show()

+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+



In [27]:
train.select('Age','Gender').dropDuplicates().sort('Age', 'Gender').show()

+-----+------+
|  Age|Gender|
+-----+------+
| 0-17|     F|
| 0-17|     M|
|18-25|     F|
|18-25|     M|
|26-35|     F|
|26-35|     M|
|36-45|     F|
|36-45|     M|
|46-50|     F|
|46-50|     M|
|51-55|     F|
|51-55|     M|
|  55+|     F|
|  55+|     M|
+-----+------+



In [28]:
train.select('Age','Gender').distinct().sort('Age', 'Gender').show()

+-----+------+
|  Age|Gender|
+-----+------+
| 0-17|     F|
| 0-17|     M|
|18-25|     F|
|18-25|     M|
|26-35|     F|
|26-35|     M|
|36-45|     F|
|36-45|     M|
|46-50|     F|
|46-50|     M|
|51-55|     F|
|51-55|     M|
|  55+|     F|
|  55+|     M|
+-----+------+



In [29]:
train.count(), train.dropna().count()

(550068, 166821)

In [30]:
train.fillna(-1).show(2)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
only

In [31]:
train.filter(train.Purchase > 15000).count()

110523

In [32]:
train.groupby('Age').agg({'Purchase': 'mean'}).sort('Age').show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
| 0-17|8933.464640444974|
|18-25|9169.663606261289|
|26-35|9252.690632869888|
|36-45|9331.350694917874|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|  55+|9336.280459449405|
+-----+-----------------+



In [33]:
train.groupby('Age').count().sort(F.col('count').desc()).show()

+-----+------+
|  Age| count|
+-----+------+
|26-35|219587|
|36-45|110013|
|18-25| 99660|
|46-50| 45701|
|51-55| 38501|
|  55+| 21504|
| 0-17| 15102|
+-----+------+



In [34]:
t1 = train.sample(withReplacement=False, fraction=0.2, seed=42)
t2 = train.sample(withReplacement=False, fraction=0.2, seed=43)
t1.count(), t2.count()

(110376, 110393)

In [35]:
train.select('User_ID').rdd.map(lambda x:(x, 1)).take(5)

[(Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000001), 1),
 (Row(User_ID=1000002), 1)]

In [36]:
train.orderBy(train.Purchase.desc()).show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1001474| P00052842|     M|26-35|         4|            A|                         2|             1|                10|                15|              null|   23961|
|1003160| P00052842|     M|26-35|        17|            C|                         3|             0|                10|                15|              null|   23961|
|1002272| P00052842|     M|26-35|         0|            C|                         1|             0|                10|                15|              null|   23961

In [37]:
train.withColumn('Purchase_new', train.Purchase /2.0).select('Purchase','Purchase_new').show(5)

+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
|    7969|      3984.5|
+--------+------------+
only showing top 5 rows



In [38]:
train.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase']

In [39]:
train_augmented = train.withColumn('Purchase_new', train.Purchase / 2.0)

In [40]:
train_augmented.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Purchase_new']

In [41]:
train_augmented.take(2)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370, Purchase_new=4185.0),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200, Purchase_new=7600.0)]

In [42]:
test_diminished = test.drop('Age')

In [43]:
test_diminished.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

In [44]:
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))

In [45]:
diff_cat_in_train_test.distinct().count()

46

In [46]:
not_found_cat = diff_cat_in_train_test.distinct().rdd.map(lambda x: x[0]).collect()

In [47]:
len(not_found_cat)

46

In [48]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
F1 = udf(lambda x: '-1' if x in not_found_cat else x, StringType())

In [49]:
k = test.withColumn("NEW_Product_ID", F1(test["Product_ID"])).select('NEW_Product_ID')

In [50]:
k.describe().take(2)

[Row(summary='count', NEW_Product_ID='233599'),
 Row(summary='mean', NEW_Product_ID='-1.0')]

In [51]:
diff_cat_in_train_test=k.select('NEW_Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()

1

In [52]:
diff_cat_in_train_test.distinct().collect()

[Row(NEW_Product_ID='-1')]

In [53]:
train.registerTempTable('train_table')

In [54]:
sql_context.sql('select Product_ID from train_table').show(5)

+----------+
|Product_ID|
+----------+
| P00069042|
| P00248942|
| P00087842|
| P00085442|
| P00285442|
+----------+
only showing top 5 rows



In [55]:
sql_context.sql('select Age, max(Purchase) from train_table group by Age').show()

+-----+-------------+
|  Age|max(Purchase)|
+-----+-------------+
|18-25|        23958|
|26-35|        23961|
| 0-17|        23955|
|46-50|        23960|
|51-55|        23960|
|36-45|        23960|
|  55+|        23960|
+-----+-------------+

