In [1]:
import findspark
import os

spark_location='C:\Program Files\Spark\spark-2.4.5-bin-hadoop2.7' # Set your own
java8_location= '/usr/lib/jvm/java-8-openjdk-amd64' # Set your own
os.environ['JAVA_HOME'] = java8_location
findspark.init(spark_home=spark_location) 

In [3]:
findspark.init()


In [4]:
import pyspark

            ### intro

In [22]:
from pyspark.sql.functions import max

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [24]:
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv(r"C:\Users\y.li\OneDrive\Study\Spark\data\2015-summary.csv")

In [26]:
flightData2015.take(4)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)]

In [14]:
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

In [15]:
dataFrameWay.explain()

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#888], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#888, 200)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#888], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#888] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/y.li/OneDrive/Study/Spark/data/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


In [16]:
flightData2015.select(max("count")).take(1)

[Row(max(count)=370002)]

In [29]:
flightData2015.createOrReplaceTempView("fd1");
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM fd1
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.collect()

[Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

In [None]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

### Streaming

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(r"C:\Users\y.li\OneDrive\Study\Spark\Spark-The-Definitive-Guide-master\Spark-The-Definitive-Guide-master\data\retail-data\by-day\*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

staticSchema = staticDataFrame.schema

In [8]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load(r"C:\Users\y.li\OneDrive\Study\Spark\Spark-The-Definitive-Guide-master\Spark-The-Definitive-Guide-master\data\retail-data\by-day\*.csv")

In [7]:
staticSchema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,TimestampType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

In [9]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [10]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

In [11]:
preppedDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = false)
 |-- CustomerID: double (nullable = false)
 |-- Country: string (nullable = true)
 |-- day_of_week: string (nullable = true)



In [15]:
trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")

testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")

In [16]:
trainDataFrame

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string]

In [17]:
trainDataFrame.count()

245903

In [18]:
testDataFrame.count()

296006

In [19]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [20]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

In [21]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")

In [22]:
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

In [23]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [24]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [26]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1)

In [27]:
kmModel = kmeans.fit(transformedTraining)

In [28]:
kmModel.computeCost(transformedTraining)

84553739.96537486

In [29]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [30]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [31]:
kmModel.computeCost(transformedTest)

517507094.72221166