# Installation:


conda create -n pysparklesson python=3.9  
conda activate pysparklesson  
conda install -c conda-forge pyspark  
pip install jupyter  
jupyter notebook  

In [1]:
from pyspark.sql import SparkSession,Row, functions as sql_f
from pyspark import SparkContext, SQLContext,SparkConf

# 
from pyspark.sql.functions import pandas_udf, PandasUDFType

# schema 
from pyspark.sql.types import *

import pandas as pd
import numpy as np

In [2]:
# define spark context (boilerplate)

In [3]:
conf = SparkConf().setMaster("local[*]").set("spark.sql.execution.arrow.pyspark.enabled", "true")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/02 21:35:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Load csv

In [4]:
train = spark.read.csv(path="data/train.csv",
                      header=True,
                      inferSchema=True)

In [5]:
train.show(n=10)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [8]:
train.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



# Shape

In [9]:
train.count()

550068

In [10]:
len(train.columns)

12

# Filtering

In [12]:
train_filtered = train.filter(train.Purchase > 100)

In [23]:
train_filtered_2 = train.where(train.Purchase > 100)

In [24]:
assert train_filtered_2.count() == train_filtered.count()

# Select columns, sum, rename

In [19]:
train_filtered = train_filtered.select("User_ID",
                                       "Purchase"
                                      )

In [25]:
train_filtered = train.where(train.Purchase > 100).select("Purchase")

In [32]:
train_filtered = train.where(train.Purchase > 100).\
                 select(sql_f.sum("Purchase").\
                        alias("total_purchase_value"))

In [33]:
train_filtered.show()

+--------------------+
|total_purchase_value|
+--------------------+
|          5095753364|
+--------------------+



In [42]:
total_distinct_users = train.where(train.Purchase > 100).\
                       select(sql_f.countDistinct("User_ID").alias("total_unique_users"))

In [43]:
total_distinct_users.collect()[0]["total_unique_users"]

5891

In [38]:
train.where(train.Purchase > 100).\
                       select("User_ID").distinct().count()

5891

# Group by

In [46]:
total_product_category_1_purchase = train.\
                                    groupBy("Product_Category_1").\
                                    agg(sql_f.sum("Purchase").alias("total_purchase"),
                                       sql_f.countDistinct("Purchase").alias("total_unique_customers"))

In [47]:
total_product_category_1_purchase.show()



+------------------+--------------+----------------------+
|Product_Category_1|total_purchase|total_unique_customers|
+------------------+--------------+----------------------+
|                12|       5331844|                   334|
|                 1|    1910013754|                  3795|
|                13|       4008601|                   189|
|                 6|     324150302|                  3417|
|                16|     145120612|                  3288|
|                 3|     204084713|                  2528|
|                20|        944727|                   120|
|                 5|     941835229|                  1715|
|                19|         59378|                    15|
|                15|      92969042|                  3073|
|                 9|       6370324|                   394|
|                17|       5878699|                   487|
|                 4|      27380488|                   685|
|                 8|     854318799|                  194

                                                                                

# Dropping duplicates

In [51]:
train.select("Age").dropDuplicates().\
show()

+-----+
|  Age|
+-----+
|18-25|
|26-35|
| 0-17|
|46-50|
|51-55|
|36-45|
|  55+|
+-----+



# Joins

In [53]:
dummy1 = pd.DataFrame()
dummy1["key"] = np.arange(1000)
dummy1["values"] = np.random.randn(1000)

In [54]:
dummy1

Unnamed: 0,key,values
0,0,-1.668995
1,1,0.666190
2,2,-2.285035
3,3,-1.552433
4,4,-1.456636
...,...,...
995,995,0.136001
996,996,-0.018284
997,997,-0.458824
998,998,-0.572730


In [55]:
dummy2 = pd.DataFrame()
dummy2['key'] = np.random.choice(np.arange(1000), size = 500, replace = False)

In [56]:
dummy2

Unnamed: 0,key
0,221
1,861
2,988
3,64
4,705
...,...
495,248
496,565
497,141
498,278


In [57]:
dummy1_spark = spark.createDataFrame(dummy1)
dummy2_spark = spark.createDataFrame(dummy2)

In [58]:
dummy1_spark.show()

+---+--------------------+
|key|              values|
+---+--------------------+
|  0| -1.6689949703834375|
|  1|   0.666190391886411|
|  2|  -2.285035460887746|
|  3| -1.5524329265813483|
|  4| -1.4566358240091672|
|  5|-0.01169638204669...|
|  6|  1.7401948062771198|
|  7|  0.6543581974243693|
|  8|-0.22059990615093994|
|  9|-0.11917726608075765|
| 10|-0.26912641924405917|
| 11| -0.4758379553526466|
| 12|-0.39606116887380227|
| 13|-0.45864051272897044|
| 14| -0.9014999352102138|
| 15|  0.2982036641139408|
| 16|  0.3029049929299428|
| 17| -0.9036303913638039|
| 18|  0.7922917722716324|
| 19|  0.6934692625288705|
+---+--------------------+
only showing top 20 rows



In [59]:
join_result = dummy2_spark.join(dummy1_spark,
                               on = ["key"],
                               how= "inner")

In [60]:
join_result.show()

+---+--------------------+
|key|              values|
+---+--------------------+
| 65|  0.3816356895971067|
| 19|  0.6934692625288705|
| 54| -1.5377008109011419|
|112| -2.3671334646572912|
|  7|  0.6543581974243693|
| 94|-0.12883945944592798|
|110| -0.0818082529668762|
| 32| -0.7579084227033176|
|119|  1.2465331377415017|
| 98|-0.13915770036165706|
|116|   1.576481604745302|
| 25|  0.5149509068126464|
| 95|  1.5232720898696541|
| 68| -1.2524366855150106|
| 72| -2.1719911710033775|
| 87|  -0.807569081302329|
|107|  -0.555257633784366|
|  9|-0.11917726608075765|
| 27|-0.27775435904174406|
| 51|  1.7092428901099426|
+---+--------------------+
only showing top 20 rows



# Drop na

In [61]:
train.dropna().count()

166821

# Fill na

In [63]:
train_filled_na = train.select("Purchase").fillna(-1)

In [65]:
train_filled_na.filter(train_filled_na.Purchase == -1).show()

+--------+
|Purchase|
+--------+
+--------+



# Sample

In [66]:
train.sample(withReplacement=None,fraction=0.1).count()

55322

# Adding a column

In [69]:
train = train.withColumn("new_column", train.Purchase / 2.0)

In [70]:
train.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|new_column|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|    4185.0|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|    7600.0|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|          

# UDF

https://www.databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html?utm_source=hootsuite&utm_medium=&utm_term=&utm_content=&utm_campaign=

In [5]:
# mean group by agg
train.groupBy('User_ID').agg(sql_f.mean(train.Purchase)).show(n=10)

[Stage 2:>                                                          (0 + 7) / 7]

+-------+------------------+
|User_ID|     avg(Purchase)|
+-------+------------------+
|1000149| 11302.68263473054|
|1000190| 9612.111111111111|
|1000636| 12100.44642857143|
|1001043|11461.695652173914|
|1001129|10781.338461538462|
|1001139|10564.731481481482|
|1001601|13003.398058252427|
|1002605| 10818.85436893204|
|1003031|            8917.2|
|1003373|10867.411764705883|
+-------+------------------+
only showing top 10 rows





In [6]:
"""
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

"""

schema = StructType([StructField("User_ID", IntegerType(), True),
                    StructField("output", DoubleType(), True)])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def just_some_custom_mean(df):
    """
    Parameters
    ----------
    df: pd.DataFrame
    """
    key = df["User_ID"].unique()[0]
    output = df["Purchase"].mean()
    return pd.DataFrame({"User_ID":[key],
                        "output":[output]})

In [7]:
# train_pandas.groupby(["User_ID"]).apply(just_some_custom_mean) - not distributed
result = train.groupBy("User_ID").apply(just_some_custom_mean)



In [8]:
result.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------+------------------+
|User_ID|            output|
+-------+------------------+
|1000005| 7745.292452830188|
|1000042|          11864.28|
|1000049|12206.066666666668|
|1000055| 9594.285714285714|
|1000060| 7778.583333333333|
|1000062| 8913.539007092199|
|1000081|12250.529411764706|
|1000083|  9999.22641509434|
|1000084| 12596.59090909091|
|1000088| 9980.242424242424|
|1000089| 8019.714285714285|
|1000092| 7057.910313901345|
|1000101|17246.439393939392|
|1000102| 7362.264367816092|
|1000121|14303.268292682927|
|1000123|  9196.28409090909|
|1000127|          11712.61|
|1000133| 8622.193548387097|
|1000138|16031.027027027027|
|1000140| 9972.242424242424|
+-------+------------------+
only showing top 20 rows



23/05/02 21:43:23 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /private/var/folders/z9/_l65135549l_v6k4d3tz369r0000gn/T/blockmgr-09d67d10-a364-48df-bc31-95393303ff9f. Falling back to Java IO way
java.io.IOException: Failed to delete: /private/var/folders/z9/_l65135549l_v6k4d3tz369r0000gn/T/blockmgr-09d67d10-a364-48df-bc31-95393303ff9f
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:177)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:113)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:368)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:364)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.Indexe

# checkpoint/cache

In [None]:
train.checkpoint() # save and execute all actions 