#### Spark Session

In [1]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f66f262b490>


#### Create a Range of Numbers

This range of numbers represents a distributed collection. When
run on a cluster, each part of this range of numbers exists on a different executor.

In [2]:
myRange = spark.range(1000).toDF("number")

#### Transformations

Transformations are the core of how you express your business logic using Spark.

In [3]:
divisby2 = myRange.where("number % 2 = 0")

#### Actions

To trigger the computation

In [4]:
divisby2.count()

500

#### End to End Example

In [5]:
#auto infer schema and set the first line as headers 
flightData2015 = spark.read.option("inferSchema","true").option("header","true").csv("data/flight-data/2015-summary.csv")

In [6]:
#retrieve first 3 rows from file
flightData2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [7]:
#explain plan
flightData2015.sort("count").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#29 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#29 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=71]
      +- FileScan csv [DEST_COUNTRY_NAME#27,ORIGIN_COUNTRY_NAME#28,count#29] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/tiger01733/spark/data/flight-data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [8]:
#Set number of partitions for shuffle to 5
spark.conf.set("spark.sql.shuffle.partitions","5")

flightData2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

#### DataFrames and SQL

In [9]:
#convert a Dataframe into an SQL Table/View
flightData2015.createOrReplaceTempView("flight_data_2015")

In [10]:
sqlquery = spark.sql("""SELECT DEST_COUNTRY_NAME, count(1) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME""")

In [11]:
dataFramequery = flightData2015.groupBy("DEST_COUNTRY_NAME").count()

In [12]:
#plan to be executed when an action is triggered
sqlquery.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 5), ENSURE_REQUIREMENTS, [plan_id=93]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/tiger01733/spark/data/flight-data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [13]:
dataFramequery.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 5), ENSURE_REQUIREMENTS, [plan_id=106]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#27] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/tiger01733/spark/data/flight-data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [14]:
spark.sql("SELECT max(count) from flight_data_2015").take(1)

[Row(max(count)=370002)]

In [15]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

#### Multi Transformation Query

In [16]:
maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME, sum(count) as destination_total from flight_data_2015 
                    GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5""")

In [17]:
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [18]:
from pyspark.sql.functions import desc

#sum up count followed by sort according to the summed up count
flightData2015.groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)","destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [19]:
flightData2015.groupBy("DEST_COUNTRY_NAME")\
    .sum("count")\
    .withColumnRenamed("sum(count)","destination_total")\
    .sort(desc("destination_total"))\
    .limit(5)\
    .explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- TakeOrderedAndProject(limit=5, orderBy=[destination_total#123L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#27,destination_total#123L])
   +- HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[sum(count#29)])
      +- Exchange hashpartitioning(DEST_COUNTRY_NAME#27, 5), ENSURE_REQUIREMENTS, [plan_id=276]
         +- HashAggregate(keys=[DEST_COUNTRY_NAME#27], functions=[partial_sum(count#29)])
            +- FileScan csv [DEST_COUNTRY_NAME#27,count#29] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/tiger01733/spark/data/flight-data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




### Chapter 3

#### Structured Streaming

In [20]:
staticdF = spark.read.format("csv").option("header","true").option("inferSchema","true").load("data/retail-data/by-day/*.csv")

                                                                                

In [21]:
from pyspark.sql.functions import window, column, desc, col

staticdF.selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost","InvoiceDate")\
    .groupBy(col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
    .sum("total_cost")\
    .sort(desc("sum(total_cost)"))\
    .show(5)



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 05:30...|          71601.44|
|      NULL|{2011-11-14 05:30...|          55316.08|
|      NULL|{2011-11-07 05:30...|          42939.17|
|      NULL|{2011-03-29 05:30...| 33521.39999999998|
|      NULL|{2011-12-08 05:30...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

In [22]:
#set number of partitions for shuffle to 5
spark.conf.set("spark.sql.shuffle.partitions","5")

In [23]:
staticSchema = staticdF.schema

streamingdF = spark.readStream.schema(staticSchema)\
    .option("maxFilesPerTrigger",1)\
    .format("csv")\
    .option("header","true")\
    .load("data/retail-data/by-day/*.csv")

In [24]:
streamingdF.isStreaming

True

In [25]:
purchaseByCustomerPerHour = streamingdF\
    .selectExpr("CustomerId","(UnitPrice * Quantity) as total_cost","InvoiceDate")\
    .groupBy(col("CustomerId"),window(col("InvoiceDate"),"1 day"))\
    .sum("total_cost")

In [26]:
#write output to in memory table
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

24/01/28 18:19:04 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e3ea4c63-387e-4e6f-803c-45ece0e8cb36. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/28 18:19:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f66f0bcd9a0>

In [28]:
spark.sql("""
    SELECT * FROM customer_purchases
    ORDER BY 'sum(total_cost)' DESC
    """).show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   13329.0|{2010-12-08 05:30...|             304.2|
|   16250.0|{2010-12-01 05:30...|            226.14|
|   17460.0|{2010-12-01 05:30...|              19.9|
|   13491.0|{2010-12-02 05:30...|              98.9|
|   14594.0|{2010-12-01 05:30...|254.99999999999997|
+----------+--------------------+------------------+
only showing top 5 rows



In [28]:
#write output to console
purchaseByCustomerPerHour.writeStream\
    .format("console")\
    .queryName("customer_purchases_2")\
    .outputMode("complete")\
    .start()

24/01/28 18:18:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d5c8c0eb-185b-4ee9-9716-80ab94c861ba. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/28 18:18:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x7f67285bc880>

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12921.0|{2010-12-01 05:30...|             322.4|
|   16583.0|{2010-12-01 05:30...|233.45000000000002|
|   17897.0|{2010-12-01 05:30...|            140.39|
|   12748.0|{2010-12-01 05:30...|              4.95|
|   15350.0|{2010-12-01 05:30...|            115.65|
|   17809.0|{2010-12-01 05:30...|              34.8|
|   13747.0|{2010-12-01 05:30...|              79.6|
|   16250.0|{2010-12-01 05:30...|            226.14|
|   15983.0|{2010-12-01 05:30...|            440.89|
|   17511.0|{2010-12-01 05:30...|           1825.74|
|   14001.0|{2010-12-01 05:30...|            301.24|
|   17460.0|{2010-12-01 05:30...|              19.9|
|   18074.0|{2010-12-01 05:30...|             489.6|
|   12868.0|{2010-12-01 05:30...|             203.3|
| 

24/01/28 18:18:15 ERROR MicroBatchExecution: Query customer_purchases_2 [id = 91b41b9f-0acc-4409-811d-c1e79bc363a6, runId = 50a23245-43dc-45ad-ac96-836128e173ca] terminated with error
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:75)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:53)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gate

#### Machine Learning Library MLlib

In [29]:
#schema of dataframe
staticdF.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [30]:
from pyspark.sql.functions import date_format, col

#extract day of week from InvoiceDate
preppedDf = staticdF.na.fill(0)\
    .withColumn("day_of_week",date_format(col("InvoiceDate"),"EEEE"))\
    .coalesce(5)

In [31]:
#train test split
trainDf = preppedDf.where("InvoiceDate < '2011-07-01'")
testDf = preppedDf.where("InvoiceDate >= '2011-07-01'")

In [32]:
print(trainDf.count())
print(testDf.count())

                                                                                

245903




296006


                                                                                

In [33]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

#One Hot encode the different days of the week
encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

#Create the feature vector
vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice","Quantity","day_of_week_encoded"])\
    .setOutputCol("features")

In [34]:
from pyspark.ml import Pipeline

#create the transformation/processing pipeline
transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [35]:
#fit
fittedPipeline = transformationPipeline.fit(trainDf)

#transform
transformedTrain = fittedPipeline.transform(trainDf)

                                                                                

In [36]:
transformedTrain.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [37]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

#train
kmeans = KMeans().setK(20)
kmModel = kmeans.fit(transformedTrain)

24/01/28 18:23:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/28 18:23:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [38]:
kmModel

KMeansModel: uid=KMeans_ee314fd0d1a9, k=20, distanceMeasure=euclidean, numFeatures=7

In [39]:
#evaluate train score
evaluator = ClusteringEvaluator()
print(evaluator.evaluate(kmModel.transform(transformedTrain)))

                                                                                

0.6255345183043269


                                                                                

In [40]:
#evaluate test score
transformedTest = fittedPipeline.transform(testDf)
print(evaluator.evaluate(kmModel.transform(transformedTest)))

                                                                                

0.46523507334368064


#### Parallelize Raw Data

In [41]:
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1),Row(2),Row(3)]).toDF()

DataFrame[_1: bigint]

### Chapter 4

In [42]:
df = spark.range(500).toDF('number')
df.select(df['number']+10)

DataFrame[(number + 10): bigint]

In [43]:
spark.range(2).collect()

[Row(id=0), Row(id=1)]

### Chapter 5

#### Schema

In [44]:
df = spark.read.format("json").load("data/flight-data/2015-summary.json")

In [45]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [46]:
df.schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

#### Manual Schema Definition

In [47]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

manualSchema = StructType([StructField("DEST_COUNTRY_NAME",StringType(),True),
                           StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
                           StructField("count",LongType(),False,metadata={"hello":"world"})])

df = spark.read.format("json").schema(manualSchema).load("data/flight-data/2015-summary.json")

In [48]:
df.schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', LongType(), True)])

#### Columns

In [49]:
from pyspark.sql.functions import col

col("Name")

Column<'Name'>

In [50]:
df['count']

Column<'count'>

In [51]:
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

#### Rows

In [52]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [53]:
from pyspark.sql import Row

sample_record = Row("Jack",1,None,False)

In [54]:
sample_record[0]

'Jack'

#### Creating DataFrames

In [55]:
df.createOrReplaceTempView("dfTable")

In [56]:
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

sample_schema = StructType([StructField("Name",StringType(),True),
                            StructField("ID",LongType(),False),
                            StructField("Designation",StringType(),True)])

sample_row = Row("Jack",94561,"Black Ops Commander")
sample_df  = spark.createDataFrame([sample_row],sample_schema)
sample_df.show()

+----+-----+-------------------+
|Name|   ID|        Designation|
+----+-----+-------------------+
|Jack|94561|Black Ops Commander|
+----+-----+-------------------+



#### select and selectExpr

In [57]:
df.select("DEST_COUNTRY_NAME").show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [58]:
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [59]:
spark.sql("SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2").show()

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+



In [60]:
from pyspark.sql.functions import expr, col, column

df.select(expr("DEST_COUNTRY_NAME"),col("ORIGIN_COUNTRY_NAME"),column("count")).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [61]:
df.select(expr("DEST_COUNTRY_NAME AS destination").alias("destination_name")).show(2)

+----------------+
|destination_name|
+----------------+
|   United States|
|   United States|
+----------------+
only showing top 2 rows



In [62]:
df.selectExpr("DEST_COUNTRY_NAME AS destination").show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [63]:
df.selectExpr("*","(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) AS withinCountry").show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [64]:
spark.sql("SELECT *, (DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) AS withinCountry from dfTable LIMIT 2").show()

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+



In [65]:
df.selectExpr("avg(count)","count(DISTINCT(DEST_COUNTRY_NAME))").show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [66]:
from pyspark.sql.functions import lit

df.select(expr("*"),lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [67]:
df.withColumn("numberOne",lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [68]:
df.withColumn("withinCountry",expr("DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [69]:
df.withColumn("Destination",expr("DEST_COUNTRY_NAME")).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

In [70]:
df.withColumnRenamed("DEST_COUNTRY_NAME","Destination").columns

['Destination', 'ORIGIN_COUNTRY_NAME', 'count']

In [71]:
dfwithLongColName = df.withColumn("Long Column Name",expr("ORIGIN_COUNTRY_NAME"))

In [72]:
dfwithLongColName.selectExpr("`Long Column Name`").show(2)

+----------------+
|Long Column Name|
+----------------+
|         Romania|
|         Croatia|
+----------------+
only showing top 2 rows



In [73]:
dfwithLongColName.drop("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME")

DataFrame[count: bigint, Long Column Name: string]

In [74]:
df.withColumn("count2",expr("count").cast("long")).show(2)

+-----------------+-------------------+-----+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
+-----------------+-------------------+-----+------+
|    United States|            Romania|   15|    15|
|    United States|            Croatia|    1|     1|
+-----------------+-------------------+-----+------+
only showing top 2 rows



In [75]:
df.count()

256

In [76]:
df.where("count < 2").count()

25

In [77]:
df.where("count < 2").where(expr("ORIGIN_COUNTRY_NAME") != "Croatia").count()

24

In [78]:
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()



256



#### Random Samples

In [79]:
withReplacement = True
fraction = 0.25
seed = 0.5

df.sample(withReplacement, fraction, seed).count()



64

#### Random Splits

In [80]:
dataframe_split = df.randomSplit([0.3,0.7],seed=0.5)

dataframe_split[0].count(), dataframe_split[1].count()

(89, 167)

In [81]:
from pyspark.sql import Row
from pyspark.sql.functions import expr, col, column

newRows = [Row("India", "Japan", 1),Row("Japan","Sri Lanka",5)]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(newRows,df.schema)

df.union(newDF).where("count=1").where(col("ORIGIN_COUNTRY_NAME")!="United States").show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|            India|              Japan|    1|
+-----------------+-------------------+-----+



#### Sorting

In [82]:
df.sort("count").show(5)

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows



In [83]:
df.orderBy(expr("count").desc()).show(3)

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
+-----------------+-------------------+------+
only showing top 3 rows



#### Limit

In [84]:
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [85]:
df.orderBy(expr("count").desc()).limit(4).show()

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|             Canada|  8483|
|           Canada|      United States|  8399|
|    United States|             Mexico|  7187|
+-----------------+-------------------+------+





#### Repartition

In [86]:
df.rdd.getNumPartitions()

1

In [87]:
df.repartition(5,col("DEST_COUNTRY_NAME"))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [88]:
df.repartition(5,col("DEST_COUNTRY_NAME")).coalesce(2)

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

                                                                                

### Chapter 6

In [1]:
df = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("data/retail-data/by-day/2010-12-01.csv")

In [2]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [3]:
df.createOrReplaceTempView("dfTable")

In [4]:
from pyspark.sql.functions import lit

In [5]:
df.select(lit(5),lit("five"),lit(5.0))

DataFrame[5: int, five: string, 5.0: double]

In [6]:
from pyspark.sql.functions import col

df.where(col("InvoiceNo")!=536365)\
.select("InvoiceNo","description")\
.show(5,False)

+---------+-----------------------------+
|InvoiceNo|description                  |
+---------+-----------------------------+
|536366   |HAND WARMER UNION JACK       |
|536366   |HAND WARMER RED POLKA DOT    |
|536367   |ASSORTED COLOUR BIRD ORNAMENT|
|536367   |POPPY'S PLAYHOUSE BEDROOM    |
|536367   |POPPY'S PLAYHOUSE KITCHEN    |
+---------+-----------------------------+
only showing top 5 rows



In [7]:
df.where("InvoiceNo = 536365")\
.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [8]:
from pyspark.sql.functions import instr

priceFilter = col("UnitPrice")>600
descriptionFilter = instr(df.Description, "POSTAGE")>=1
df.where(df.StockCode.isin("DOT")).where(priceFilter|descriptionFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      NULL|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      NULL|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [9]:
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [10]:
from pyspark.sql.functions import expr,pow

fabricatedQuantity = pow(col("Quantity")*col("UnitPrice"),2)+5
df.select(col("CustomerId"),col("Quantity"),fabricatedQuantity.alias("RealQuantity")).show(5)

+----------+--------+------------------+
|CustomerId|Quantity|      RealQuantity|
+----------+--------+------------------+
|   17850.0|       6|239.08999999999997|
|   17850.0|       6|          418.7156|
|   17850.0|       8|             489.0|
|   17850.0|       6|          418.7156|
|   17850.0|       6|          418.7156|
+----------+--------+------------------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import round,bround
df.select(round(lit(2.5)),bround(lit(2.5))).show(1)

+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
|          3.0|           2.0|
+-------------+--------------+
only showing top 1 row



In [12]:
from pyspark.sql.functions import corr

df.select(corr("Quantity","UnitPrice")).show()

+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
|     -0.04112314436835551|
+-------------------------+



In [13]:
df.describe().show()

24/02/02 08:48:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                NULL| 8.627413127413128| 4.151946589446603|15661.388719512195|          NULL|
| stddev|72.89447869788873|17407.897548583845|                NULL|26.371821677029203|15.638659854603892|1854.4496996893627|          NULL|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [14]:
df.stat.approxQuantile("UnitPrice",[0.5,0.75],0.05)

[2.51, 4.21]

In [15]:
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show(5)

+--------------------+
|initcap(Description)|
+--------------------+
|White Hanging Hea...|
| White Metal Lantern|
|Cream Cupid Heart...|
|Knitted Union Fla...|
|Red Woolly Hottie...|
+--------------------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import upper,lower

df.select(col("Description"),lower(col("Description")),upper(lower(col("Description")))).show(2)

+--------------------+--------------------+-------------------------+
|         Description|  lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...|     WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern|      WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
only showing top 2 rows



In [17]:
from pyspark.sql.functions import ltrim, rtrim, trim, lpad, rpad

df.select(ltrim(lit("   HELLO   ")).alias("ltrim"),
          rtrim(lit("   HELLO   ")).alias("rtrim"),
          trim(lit("   HELLO   ")).alias("trim"),
          lpad(lit("HELLO"),3," ").alias("lpad"),
          rpad(lit("HELLO"),10," ").alias("rpad")
         ).show(2)

+--------+--------+-----+----+----------+
|   ltrim|   rtrim| trim|lpad|      rpad|
+--------+--------+-----+----+----------+
|HELLO   |   HELLO|HELLO| HEL|HELLO     |
|HELLO   |   HELLO|HELLO| HEL|HELLO     |
+--------+--------+-----+----+----------+
only showing top 2 rows



In [18]:
from pyspark.sql.functions import regexp_replace

regex_string = 'BLACK|WHITE|RED|GREEN|BLUE'
df.select(regexp_replace(col("Description"),regex_string,"COLOR").alias("color_clean"),col("Description")).show(2)

+--------------------+--------------------+
|         color_clean|         Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
only showing top 2 rows



In [19]:
from pyspark.sql.functions import translate

df.select(translate(col("Description"),'LEET','1337')).show(2)

+----------------------------------+
|translate(Description, LEET, 1337)|
+----------------------------------+
|              WHI73 HANGING H3A...|
|               WHI73 M37A1 1AN73RN|
+----------------------------------+
only showing top 2 rows



In [20]:
from pyspark.sql.functions import regexp_extract

regex_string = 'BLACK|WHITE|RED|GREEN|BLUE'
df.select(regexp_extract(col("Description"),regex_string,0).alias("color_clean"),col("Description")).show(2)

+-----------+--------------------+
|color_clean|         Description|
+-----------+--------------------+
|      WHITE|WHITE HANGING HEA...|
|      WHITE| WHITE METAL LANTERN|
+-----------+--------------------+
only showing top 2 rows



In [21]:
containsBlack = instr(col("Description"),"BLACK") >= 1
containsWhite = instr(col("Description"),"WHITE") >= 1
df.withColumn("hasSimpleColor",containsBlack|containsWhite)\
.where("hasSimpleColor")\
.select(col("Description")).show(3)

+--------------------+
|         Description|
+--------------------+
|WHITE HANGING HEA...|
| WHITE METAL LANTERN|
|RED WOOLLY HOTTIE...|
+--------------------+
only showing top 3 rows



In [22]:
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
    return locate(color_string.upper(), column)\
    .cast("boolean")\
    .alias("is_" + color_string)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # has to a be Column type
df.select(*selectedColumns).where(expr("is_white OR is_red"))\
.select(col("Description"),col("is_white"),col("is_red")).show(3, False)

+----------------------------------+--------+------+
|Description                       |is_white|is_red|
+----------------------------------+--------+------+
|WHITE HANGING HEART T-LIGHT HOLDER|true    |false |
|WHITE METAL LANTERN               |true    |false |
|RED WOOLLY HOTTIE WHITE HEART.    |true    |true  |
+----------------------------------+--------+------+
only showing top 3 rows



In [23]:
from pyspark.sql.functions import current_date, current_timestamp

dateDf = spark.range(10)\
.withColumn("today",current_date())\
.withColumn("time",current_timestamp())

In [24]:
dateDf.printSchema()

root
 |-- id: long (nullable = false)
 |-- today: date (nullable = false)
 |-- time: timestamp (nullable = false)



In [25]:
dateDf.show(2)

+---+----------+--------------------+
| id|     today|                time|
+---+----------+--------------------+
|  0|2024-02-02|2024-02-02 08:48:...|
|  1|2024-02-02|2024-02-02 08:48:...|
+---+----------+--------------------+
only showing top 2 rows



In [26]:
from pyspark.sql.functions import date_add, date_sub

dateDf.select(date_add(col("today"),5),date_sub(col("today"),7)).show(1)

+------------------+------------------+
|date_add(today, 5)|date_sub(today, 7)|
+------------------+------------------+
|        2024-02-07|        2024-01-26|
+------------------+------------------+
only showing top 1 row



In [27]:
from pyspark.sql.functions import datediff, months_between, to_date

dateDf.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)

+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
|                       -7|
+-------------------------+
only showing top 1 row



In [28]:
dateDf.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))\
.select(months_between(col("start"), col("end"))).show(1)

+--------------------------------+
|months_between(start, end, true)|
+--------------------------------+
|                    -16.67741935|
+--------------------------------+
only showing top 1 row



In [29]:
spark.range(5).withColumn("date", lit("2017-01-01"))\
.select(to_date(col("date"))).show(1)

+-------------+
|to_date(date)|
+-------------+
|   2017-01-01|
+-------------+
only showing top 1 row



In [30]:
dateDf.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)

+-------------------+-------------------+
|to_date(2016-20-12)|to_date(2017-12-11)|
+-------------------+-------------------+
|               NULL|         2017-12-11|
+-------------------+-------------------+
only showing top 1 row



In [31]:
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))

In [32]:
from pyspark.sql.functions import to_timestamp

cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

+------------------------------+
|to_timestamp(date, yyyy-dd-MM)|
+------------------------------+
|           2017-11-12 00:00:00|
+------------------------------+



In [33]:
cleanDateDF.filter(col("date2")>lit("2017-12-12")).show()

+----------+----------+
|      date|     date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+



In [34]:
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show()

+---------------------------------+
|coalesce(Description, CustomerId)|
+---------------------------------+
|             WHITE HANGING HEA...|
|              WHITE METAL LANTERN|
|             CREAM CUPID HEART...|
|             KNITTED UNION FLA...|
|             RED WOOLLY HOTTIE...|
|             SET 7 BABUSHKA NE...|
|             GLASS STAR FROSTE...|
|             HAND WARMER UNION...|
|             HAND WARMER RED P...|
|             ASSORTED COLOUR B...|
|             POPPY'S PLAYHOUSE...|
|             POPPY'S PLAYHOUSE...|
|             FELTCRAFT PRINCES...|
|             IVORY KNITTED MUG...|
|             BOX OF 6 ASSORTED...|
|             BOX OF VINTAGE JI...|
|             BOX OF VINTAGE AL...|
|             HOME BUILDING BLO...|
|             LOVE BUILDING BLO...|
|             RECIPE BOX WITH M...|
+---------------------------------+
only showing top 20 rows



In [35]:
df.na.drop("all",subset=["StockCode","InvoiceNo"])

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [36]:
df.na.fill("all",subset=["StockCode","InvoiceNo"])

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [37]:
df.na.replace([""],["UNKNOWN"],"Description")

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [38]:
complexDF = df.selectExpr("(Description,InvoiceNo) as complex","*")

In [39]:
complexDF.select("complex.*").show(2)

+--------------------+---------+
|         Description|InvoiceNo|
+--------------------+---------+
|WHITE HANGING HEA...|   536365|
| WHITE METAL LANTERN|   536365|
+--------------------+---------+
only showing top 2 rows



In [40]:
from pyspark.sql.functions import split,size,array_contains

df.select(split(col("Description")," ")).show(2)

+-------------------------+
|split(Description,  , -1)|
+-------------------------+
|     [WHITE, HANGING, ...|
|     [WHITE, METAL, LA...|
+-------------------------+
only showing top 2 rows



In [41]:
df.select(split(col("Description")," ").alias("array_col")).selectExpr("array_col[0]").show(2)

+------------+
|array_col[0]|
+------------+
|       WHITE|
|       WHITE|
+------------+
only showing top 2 rows



In [42]:
df.select(size(split(col("Description")," "))).show(5)

+-------------------------------+
|size(split(Description,  , -1))|
+-------------------------------+
|                              5|
|                              3|
|                              5|
|                              6|
|                              5|
+-------------------------------+
only showing top 5 rows



In [43]:
df.select(array_contains(split(col("Description")," "),"WHITE")).show(5)

+------------------------------------------------+
|array_contains(split(Description,  , -1), WHITE)|
+------------------------------------------------+
|                                            true|
|                                            true|
|                                           false|
|                                           false|
|                                            true|
+------------------------------------------------+
only showing top 5 rows



In [44]:
from pyspark.sql.functions import create_map

df.select(create_map(col("Description"),col("InvoiceNo")).alias("complex_map")).show(2)

+--------------------+
|         complex_map|
+--------------------+
|{WHITE HANGING HE...|
|{WHITE METAL LANT...|
+--------------------+
only showing top 2 rows



In [45]:
df.select(create_map(col("Description"),col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)

+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
|                            NULL|
|                          536365|
+--------------------------------+
only showing top 2 rows



In [46]:
jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

In [47]:
jsonDF.show()

+--------------------+
|          jsonString|
+--------------------+
|{"myJSONKey" : {"...|
+--------------------+



In [48]:
from pyspark.sql.functions import get_json_object, json_tuple

jsonDF.select(get_json_object(col("jsonString"),"$.myJSONKey.myJSONValue[1]").alias("column"),\
             json_tuple(col("jsonString"),"myJSONKey")).show(2)

+------+--------------------+
|column|                  c0|
+------+--------------------+
|     2|{"myJSONValue":[1...|
+------+--------------------+



In [49]:
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import *

parseSchema = StructType((
StructField("InvoiceNo",StringType(),True),
StructField("Description",StringType(),True)))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)

+--------------------+--------------------+
|  from_json(newJSON)|             newJSON|
+--------------------+--------------------+
|{536365, WHITE HA...|{"InvoiceNo":"536...|
|{536365, WHITE ME...|{"InvoiceNo":"536...|
+--------------------+--------------------+
only showing top 2 rows



In [50]:
udfExampleDF = spark.range(5).toDF("num")

def power3(double_value):
    return double_value ** 3

power3(2.0)

8.0

In [52]:
from pyspark.sql.functions import udf
power3udf = udf(power3)

In [53]:
udfExampleDF.select(power3udf(col("num"))).show(2)

+-----------+
|power3(num)|
+-----------+
|          0|
|          1|
+-----------+
only showing top 2 rows



In [54]:
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, DoubleType())

udfExampleDF.selectExpr("power3py(num)").show(2)

+-------------+
|power3py(num)|
+-------------+
|         NULL|
|         NULL|
+-------------+
only showing top 2 rows



In [56]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("data/retail-data/all/*.csv")\
.coalesce(5)

df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [57]:
df.count()

                                                                                

541909

In [62]:
from pyspark.sql.functions import count, countDistinct, approx_count_distinct

df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [61]:
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [63]:
df.select(approx_count_distinct("StockCode",0.05)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3804|
+--------------------------------+



In [64]:
from pyspark.sql.functions import first, last

df.select(first("StockCode"),last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [65]:
from pyspark.sql.functions import min, max

df.select(min("Quantity"),max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [66]:
from pyspark.sql.functions import sum, sumDistinct

df.select(sum("Quantity"),sumDistinct("Quantity")).show()



+-------------+----------------------+
|sum(Quantity)|sum(DISTINCT Quantity)|
+-------------+----------------------+
|      5176450|                 29310|
+-------------+----------------------+



In [67]:
from pyspark.sql.functions import mean

df.select(mean("Quantity")).show()

+----------------+
|   avg(Quantity)|
+----------------+
|9.55224954743324|
+----------------+



In [68]:
from pyspark.sql.functions import var_pop, stddev_pop, var_samp, stddev_samp

df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609056|47559.391409298754|  218.08095663447796|   218.08115785023418|
+------------------+------------------+--------------------+---------------------+



In [69]:
from pyspark.sql.functions import skewness, kurtosis

df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+



In [70]:
from pyspark.sql.functions import corr, covar_pop, covar_samp

df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085635685E-4|             1052.7280543902734|            1052.7260778741693|
+-------------------------+-------------------------------+------------------------------+



In [71]:
from pyspark.sql.functions import collect_set, collect_list

df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [72]:
df.groupBy("InvoiceNo","CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   536596|      NULL|    6|
|   537252|      NULL|    1|
|   538041|      NULL|    1|
|   537159|     14527|   28|
|   537213|     12748|    6|
|   538191|     15061|   16|
|  C539301|     13496|    1|
+---------+----------+-----+
only showing top 20 rows



In [73]:
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|   536597|  28|             28|
+---------+----+---------------+
only showing top 20 rows



In [75]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

In [76]:
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))

In [78]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec = Window\
.partitionBy("CustomerId", "date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [79]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [80]:
from pyspark.sql.functions import dense_rank, rank

purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [85]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [90]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

### Chapter 7

In [1]:
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

In [2]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [3]:
person.show()

                                                                                

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



In [4]:
graduateProgram.show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [5]:
sparkStatus.show()

+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



In [6]:
joinExpression = person["graduate_program"] == graduateProgram['id']

person.join(graduateProgram,joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [7]:
person.join(graduateProgram,joinExpression,"outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|NULL|            NULL|            NULL|           NULL|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [8]:
person.join(graduateProgram,joinExpression,"left_outer").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [9]:
person.join(graduateProgram,joinExpression,"right_outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|NULL|            NULL|            NULL|           NULL|  2|Masters|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [10]:
person.join(graduateProgram,joinExpression,"left_semi").show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



In [11]:
person.crossJoin(graduateProgram).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  2|Masters|                EECS|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  2|Masters|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  0|Masters|School of Informa...|UC 



In [13]:
from pyspark.sql.functions import expr

person.withColumnRenamed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



In [14]:
from pyspark.sql.functions import broadcast

joinExpr = person["graduate_program"] == graduateProgram["id"]
person.join(broadcast(graduateProgram), joinExpr).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [graduate_program#10L], [id#24L], Inner, BuildRight, false
   :- Project [_1#0L AS id#8L, _2#1 AS name#9, _3#2L AS graduate_program#10L, _4#3 AS spark_status#11]
   :  +- Filter isnotnull(_3#2L)
   :     +- Scan ExistingRDD[_1#0L,_2#1,_3#2L,_4#3]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=758]
      +- Project [_1#16L AS id#24L, _2#17 AS degree#25, _3#18 AS department#26, _4#19 AS school#27]
         +- Filter isnotnull(_1#16L)
            +- Scan ExistingRDD[_1#16L,_2#17,_3#18,_4#19]




In [15]:
person.join(graduateProgram, joinExpr).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [graduate_program#10L], [id#24L], Inner
   :- Sort [graduate_program#10L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(graduate_program#10L, 200), ENSURE_REQUIREMENTS, [plan_id=789]
   :     +- Project [_1#0L AS id#8L, _2#1 AS name#9, _3#2L AS graduate_program#10L, _4#3 AS spark_status#11]
   :        +- Filter isnotnull(_3#2L)
   :           +- Scan ExistingRDD[_1#0L,_2#1,_3#2L,_4#3]
   +- Sort [id#24L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#24L, 200), ENSURE_REQUIREMENTS, [plan_id=790]
         +- Project [_1#16L AS id#24L, _2#17 AS degree#25, _3#18 AS department#26, _4#19 AS school#27]
            +- Filter isnotnull(_1#16L)
               +- Scan ExistingRDD[_1#16L,_2#17,_3#18,_4#19]




### Chapter 9

In [1]:
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("data/flight-data/2015-summary.csv")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [19]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("data/flight-data/my-tsv-file.tsv")

In [21]:
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("data/flight-data/2015-summary.json").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [22]:
csvFile.write.format("json").mode("overwrite").save("data/flight-data/my-json-file.json")

In [1]:
spark.read.format("parquet")\
.load("data/flight-data/parquet/2010-summary.parquet").show(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [4]:
csvFile.write.format("parquet").mode("overwrite")\
.save("data/flight-data/my-parquet-file.parquet")

In [5]:
spark.read.format("orc").load("data/flight-data/orc/2010-summary.orc").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [6]:
csvFile.write.format("orc").mode("overwrite").save("data/flight-data/my-json-file.orc")

In [13]:
spark.read.format("text").load("data/flight-data/csv/2010-summary.csv")\
.selectExpr("split(value, ',') as rows").show()

+--------------------+
|                rows|
+--------------------+
|[DEST_COUNTRY_NAM...|
|[United States, R...|
|[United States, I...|
|[United States, I...|
|[Egypt, United St...|
|[Equatorial Guine...|
|[United States, S...|
|[United States, G...|
|[Costa Rica, Unit...|
|[Senegal, United ...|
|[United States, M...|
|[Guyana, United S...|
|[United States, S...|
|[Malta, United St...|
|[Bolivia, United ...|
|[Anguilla, United...|
|[Turks and Caicos...|
|[United States, A...|
|[Saint Vincent an...|
|[Italy, United St...|
+--------------------+
only showing top 20 rows



In [14]:
csvFile.select("DEST_COUNTRY_NAME").write.text("data/flight-data/simple-text-file.txt")

In [2]:
csvFile.write.format("parquet").mode("overwrite")\
.bucketBy(10, "count").saveAsTable("bucketedFiles")

24/03/01 16:40:53 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/03/01 16:40:53 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/03/01 16:40:57 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/03/01 16:40:57 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore tiger01733@127.0.1.1
24/03/01 16:40:58 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
24/03/01 16:40:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/03/01 16:41:02 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.ma

### Chapter 10

In [3]:
spark.read.json("data/flight-data/json/2015-summary.json")\
.createOrReplaceTempView("some_sql_view") # DF => SQL

spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""")\
.where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10")\
.count()

12