In [55]:
!head /data/SparkDefinitive/data/flight-data/csv/2015-summary.csv

53076.97s - pydevd: Sending message related to process being replaced timed-out after 5 seconds
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40


In [4]:
import pyspark
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession.builder.appName('Definitive').getOrCreate()

In [16]:
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/data/SparkDefinitive/data/flight-data/csv/2015-summary.csv")

In [39]:
flightData2015.show(3)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [22]:
flightData2015.sort('count').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=67]
      +- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/SparkDefinitive/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>




In [25]:
spark.conf.set('spark.sql.shuffle.partitions', '5')
flightData2015.sort('count').take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [26]:
flightData2015.createOrReplaceTempView('flight_data_2015')

In [29]:
   sqlway = spark.sql("""
   SELECT DEST_COUNTRY_NAME, count(1)
   FROM flight_data_2015
   GROUP BY DEST_COUNTRY_NAME
   """)

In [30]:
dataFrameWay = flightData2015\
    .groupBy("DEST_COUNTRY_NAME")\
    .count()

In [32]:
sqlway.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=146]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/data/SparkDefinitive/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#17, 5), ENSURE_REQUIREMENTS, [plan_id=159]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#17], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFile

In [34]:
spark.sql('SELECT max(count) FROM flight_data_2015').show(1)

+----------+
|max(count)|
+----------+
|    370002|
+----------+



In [37]:
from pyspark.sql.functions import max
flightData2015.select(max('count')).show(1)

+----------+
|max(count)|
+----------+
|    370002|
+----------+



In [38]:
maxSql = spark.sql("""
    SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY sum(count) DESC
    LIMIT 5
""")
maxSql.show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [42]:
from pyspark.sql.functions import desc
flightData2015\
    .groupBy('DEST_COUNTRY_NAME')\
    .sum('count')\
    .withColumnRenamed('sum(count)', 'destination_total')\
    .sort(desc('destination_total'))\
    .limit(5)\
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [46]:
 !./bin/spark-submit \
    --master local \
    ./examples/src/main/python/pi.py 10

41619.46s - pydevd: Sending message related to process being replaced timed-out after 5 seconds
/bin/bash: line 1: ./bin/spark-submit: No such file or directory


In [73]:
spark.stop()

Structured Streaming

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('streaming').getOrCreate()

In [4]:
staticDataFrame = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/data/SparkDefinitive/data/retail-data/by-day/*.csv")

In [5]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [7]:
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")\
    .show(3)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 00:00...|            -37.6|
|   14126.0|{2011-11-29 00:00...|643.6300000000001|
|   13500.0|{2011-11-16 00:00...|497.9700000000001|
+----------+--------------------+-----------------+
only showing top 3 rows



In [8]:
#let’s take a look at the streaming code!
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("/data/retail-data/by-day/*.csv")

#Now we can see whether our DataFrame is streaming:
streamingDataFrame.isStreaming
#returns true

True

In [9]:
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
    .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")

#This is still a lazy operation, so we will need to call a streaming action to start the
#execution of this data flow

In [10]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7efd9c046390>

In [11]:
"""When we start the stream, we can run queries against it to debug what our result will
look like if we were to write this out to a production sink:"""
spark.sql("""
    SELECT *
    FROM customer_purchases
    ORDER BY `sum(total_cost)` DESC
    """)\
    .show(5)


+----------+------+---------------+
|CustomerId|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



Machine Learning & Advanced Analytics

In [12]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [14]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn('day_of_week', date_format(col('InvoiceDate'), 'EEEE'))\
    .coalesce(5)

In [15]:
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

In [17]:
trainDataFrame.count()

245903

In [18]:
testDataFrame.count()

296006

In [19]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

In [20]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

In [21]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")

In [22]:
"""we’ll set this up into a pipeline so that any future data we need to transform can go
through the exact same process:"""

from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [24]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)
transformedTraining = fittedPipeline.transform(trainDataFrame)
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [26]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
    .setK(20)\
    .setSeed( 1)

In [27]:
kmModel = kmeans.fit(transformedTraining)

In [1]:
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)

NameError: name 'fittedPipeline' is not defined

Lower-Level APIs

In [5]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('lowerlevel').getOrCreate()

In [6]:
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]