Range of numbers

In [0]:
df= spark.range(100).toDF("numbers")

In [0]:
df.show()

Transformations vs Actions

In [0]:
df= df.where(df.numbers%2==0)
df.count()

Read a file into dataframe

In [0]:
fl2010= spark.\
  read.\
    option("inferSchema","true").\
      option("header","true").\
        csv("/Volumes/professional/default/flightdata/csv/2010-summary.csv")


In [0]:
fl2010.take(3)

In [0]:
fl2010.sort("count").explain()

In [0]:
spark.conf.set("spark.sql.shuffle.partitions",5)

Dataframes & SQL

In [0]:
# we can convert any dataframe into a table using spark

fl2010.createOrReplaceTempView("fl2010_tbl")
td=spark.sql(''' 
          select ORIGIN_COUNTRY_NAME,count(*) as NumberofFlights from fl2010_tbl
          group by ORIGIN_COUNTRY_NAME
          ''')
td.explain()
fl2010.groupBy("DEST_COUNTRY_NAME").count().explain()

In [0]:
from pyspark.sql.functions import *
spark.sql(''' select max(count) as maximum from fl2010_tbl''').show()
fl2010.select(max("count").alias("max")).show()

In [0]:
staticdataframe=spark.read.format("csv").option("inferSchema","true").option("header","true").\
    load("/Volumes/professional/default/retailstore/by_day/*.csv")
staticdataframe.createOrReplaceTempView("staticdataframe_tbl")
staticschema= staticdataframe.schema

In [0]:
print(staticschema)

In [0]:
from pyspark.sql.functions import *

In [0]:
staticdataframe.\
    selectExpr("CustomerId","UnitPrice*Quantity as Revenue","InvoiceDate").\
        groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("Revenue")\
    .sort(sum("Revenue"))\
.display()

In [0]:
streamingdataframe= spark.readStream.schema(staticschema).\
    option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/Volumes/professional/default/retailstore/by_day/*.csv")

In [0]:
streamingdataframe.isStreaming

In [0]:
purchaseByCustomerPerHour = streamingdataframe\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")


In [0]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.trigger(availableNow=True)\
.start()

In [0]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)


In [0]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
    .trigger(availableNow=True)\
.start()


In [0]:
staticdataframe.printSchema()

In [0]:
preppedDataFrame = staticdataframe\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)


In [0]:
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2010-12-03'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2010-12-03'")


In [0]:
trainDataFrame.count()
testDataFrame.count()

In [0]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [0]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

In [0]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")

In [0]:
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

In [0]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)


In [0]:
transformedTraining = fittedPipeline.transform(trainDataFrame)


In [0]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1)


In [0]:
kmModel = kmeans.fit(transformedTraining)

In [0]:
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)


In [0]:
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()


In [0]:
df= spark.range(100).toDF("Numbers")
df.select(df["Numbers"]+10).show()

In [0]:
from pyspark.sql.types import *

a = IntegerType()

print(type(a))