In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Chapter7").getOrCreate()
    sc = spark.sparkContext

In [None]:
df = spark.read.format("csv").option("header","true").option("InferSchema","True").load("file:///Users/jackshalu/Documents/Spark-The-Definitive-Guide-master/data/retail-data/all/*.csv")
df.printSchema()
df.show()

In [None]:
#Aggregation function

#Count function
from pyspark.sql.functions import count,countDistinct,approxCountDistinct

df.select(count("StockCode")).show()
#df.select(countDistinct("StockCode")).show()

#working with larger dataset for better performance
df.select(approxCountDistinct("StockCode",0.1)).show()

#first and last function
from pyspark.sql.functions import first,last

df.select(first("StockCode"),last("StockCode")).show()



In [None]:
#min,max function

from pyspark.sql.functions import min,max

df.select(min("UnitPrice"),max("UnitPrice")).show()

#Sum and SumDistinct 

from pyspark.sql.functions import sum,sumDistinct,round

df.select(round(sum("UnitPrice"),2),round(sumDistinct("UnitPrice"),2)).show()

#Avg and mean functions

from pyspark.sql.functions import avg,mean

df.select(avg("Quantity"),mean("Quantity")).show()

# Variance and Standard deviation

from pyspark.sql.functions import stddev_pop,stddev_samp,var_samp,var_pop
from pyspark.sql.functions import stddev,variance

df.select(stddev("UnitPrice"),stddev_samp("UnitPrice"),stddev_pop("UnitPrice")).show()
          
df.select(variance("UnitPrice"),var_samp("UnitPrice"),var_pop("UnitPrice")).show()

#Skewness and Kurtosis

from pyspark.sql.functions import skewness,kurtosis

df.select(skewness("UnitPrice"),kurtosis("UnitPrice")).show()

#Correlation and Covariance

from pyspark.sql.functions import corr,covar_samp,covar_pop

df.select(corr("InvoiceNo","UnitPrice")).show()
df.select(covar_samp("InvoiceNo","UnitPrice"),covar_pop("InvoiceNo","UnitPrice")).show()

In [None]:
#Aggregating Complex types
import pandas as pd
from pyspark.sql.functions import collect_set,collect_list

# Collect set of unique values in given columns
df.select(collect_set("Country").alias("country"),collect_set("CustomerID").alias("customerid")).show()

#retrieve a list from a col umn
collectlist = df.select(collect_set("Country").alias("country"),collect_set("CustomerID").alias("customerid")).collect()[0][0]
print(collectlist)

#convert a Dataframe column into Dictionary

#coldict = df.select(collect_set("Country").alias("set"),collect_set("CustomerID").alias("customerid")).toPandas().to_dict("records")[0]
coldict1 = df.select(collect_set("Country").alias("set"),collect_set("CustomerID").alias("customerid")).collect()[0].asDict()
print(coldict1)

#collect list of all values in given column
#df.select(collect_list("Country")).rdd.take(10)




In [None]:
#Grouping data
from pyspark.sql.functions import sum,avg,count,expr,max

df.groupBy("InvoiceNo","CustomerID").agg(sum("UnitPrice"),expr("count(UnitPrice)"),avg("UnitPrice")).show()
df.groupBy("CustomerID","InvoiceDate").agg(sum("Quantity")).filter(df.CustomerID == 17850).show()

In [None]:
#Window functions
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp,col,max,desc,dense_rank,rank

dfwithdate = df.withColumn("InvoiceDate",to_timestamp("InvoiceDate","MM/dd/yyyy H:mm"))
dfwithdate.filter("CustomerID == 17850 AND UnitPrice > 3").show()

windowspec = Window.partitionBy("CustomerID","InvoiceDate").orderBy(desc("Quantity")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

maxpurchasequantity = max("Quantity").over(windowspec)
purchaseDenseRank = dense_rank().over(windowspec)
purchaseRank = rank().over(windowspec)

dfwithdate.filter("CustomerId IS NOT NULL").orderBy("CustomerId").select("CustomerID","InvoiceDate","Quantity", \
                                                                         maxpurchasequantity.alias("Maxquantity"),purchaseDenseRank.alias("DenseRank"),purchaseRank.alias("Rank")).filter("CustomerID == 17850").show()

'''
Difference between rank and Dense_rank
The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking sequence when there are ties. 
That is, if you were ranking a competition using dense_rank and had three people tie for second place, 
you would say that all three were in second place and that the next person came in third. 
Rank would give me sequential numbers, making the person that came in third place (after the ties) would register as coming in fifth.
'''

In [None]:
#Roll up
from pyspark.sql.functions import sum,col,grouping_id,desc

dfnonull = dfwithdate.na.drop(how='any')
dfnonull.rollup("CustomerID","InvoiceDate").agg(sum("Quantity")).orderBy("CustomerID").show()

#Cube
dfnonull.where("CustomerID IS NOT NULL").cube("CustomerID","InvoiceDate").agg(grouping_id(),sum("Quantity")).orderBy(desc("grouping_id()")).show()

In [None]:

from pyspark.sql.functions import collect_set
df.printSchema()

df.createOrReplaceTempView("table1")

df2 = df.select("InvoiceNo","UnitPrice")
df2.createOrReplaceTempView("table2")

#spark.sql("CREATE TABLE  mantable1 USING CSV  OPTIONS (header true, path file:///Users/jackshalu/Documents/Spark-The-Definitive-Guide-master/data/retail-data/all/*.csv)")
spark.sql("USE default")
spark.sql("show tables").select(collect_set("tableName")).collect()[0][0]

In [2]:

import json
import pandas as pd

df_bike = spark.read.format("csv").option("header","true").option("InferSchema","True").load("file:///Users/jackshalu/Documents/Spark-The-Definitive-Guide-master/data/bike-data/201508_trip_data.csv")

from pyspark.sql.functions import min,max,expr
cols = ['Duration','Start Terminal','End terminal']

#min_dict = df_bike.select(*(min(c).alias(c) for c in cols)).collect()[0].asDict()
                          
min_dict = df_bike.select(*(min(c).alias(c) for c in cols)).toJSON().first()

colNames = df_bike.columns

print(min_dict)
sample = df_bike

samplejson = df_bike.toPandas().head(15).to_json(orient='records')
#samplejson = df_bike.toJSON().take(5)

print(colNames)

print(samplejson)
                          
#df_bike.select([(max(c) - min(c)) for c in cols]).show()

{"Duration":60,"Start Terminal":2,"End terminal":2}
['Trip ID', 'Duration', 'Start Date', 'Start Station', 'Start Terminal', 'End Date', 'End Station', 'End Terminal', 'Bike #', 'Subscriber Type', 'Zip Code']
[{"Trip ID":913460,"Duration":765,"Start Date":"8\/31\/2015 23:26","Start Station":"Harry Bridges Plaza (Ferry Building)","Start Terminal":50,"End Date":"8\/31\/2015 23:39","End Station":"San Francisco Caltrain (Townsend at 4th)","End Terminal":70,"Bike #":288,"Subscriber Type":"Subscriber","Zip Code":"2139"},{"Trip ID":913459,"Duration":1036,"Start Date":"8\/31\/2015 23:11","Start Station":"San Antonio Shopping Center","Start Terminal":31,"End Date":"8\/31\/2015 23:28","End Station":"Mountain View City Hall","End Terminal":27,"Bike #":35,"Subscriber Type":"Subscriber","Zip Code":"95032"},{"Trip ID":913455,"Duration":307,"Start Date":"8\/31\/2015 23:13","Start Station":"Post at Kearny","Start Terminal":47,"End Date":"8\/31\/2015 23:18","End Station":"2nd at South Park","End Termin

In [None]:
from pyspark.sql import functions as F

edit_cols = {"Duration":"Dur"}
dataType = ""
def editMetaData(dataframe,edit_cols,dataType=None):
    #dataframe = Table_utils.get_temporarytable(spark,tablename,employee_id,busdate)
    
    for key,value in edit_cols.items():
        dataframe = dataframe.withColumnRenamed(key,value)
        if dataType is not None:
            dataframe = dataframe.withColumn(value,F.col(value).cast(dataType))
            
    return dataframe

if dataType == "":
    ex = editMetaData(df_bike,edit_meta)
else:
    ex = editMetaData(df_bike,edit_meta,dataType)
    
ex.printSchema()


#Filter
cond = "`Start Terminal` > 50 and `End Terminal` < 50 and `Subscriber Type` == 'Subscriber'"
df_bike.where(cond).show(10)

df_bike.createOrReplaceTempView("filtername")
#spark.sql("select * from filtername where `Start Terminal` > 50 and `End Terminal` < 50 and `Subscriber Type` == 'Subscriber' ").show(10)

In [13]:
from pyspark.sql import functions as F
import pandas as pd


col_o = "Start Terminal"
mean_dict = df_bike.select([F.mean(col_o).alias(col_o)]).collect()[0].asDict()
stddev_dict = df_bike.select([F.stddev(col_o).alias(col_o)]).collect()[0].asDict()



outlier ={}
outlier['min_range'] = float(((mean_dict[col_o]) - (3*stddev_dict[col_o])))
outlier['max_range'] = float(((mean_dict[col_o]) + (3*stddev_dict[col_o])))





df_outlier = df_bike.withColumn("outlier", ((F.col(col_o) >= outlier['min_range']) & (F.col(col_o) <= outlier['max_range']))).filter("!outlier").select(col_o)
#outlier_list = df_bike.select(F.collect_set(col_o)).collect()[0][0]

df_outlierList = df_outlier.select(F.collect_list(col_o)).collect()[0][0]

outlier['output'] = df_outlierList

print(outlier)



{'min_range': 8.229436090118831, 'max_range': 108.66259896827418, 'output': [2, 2, 2, 2, 4, 5, 2, 4, 7, 4, 6, 2, 6, 4, 5, 8, 6, 5, 8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 7, 4, 4, 2, 2, 6, 6, 6, 6, 6, 6, 3, 4, 5, 8, 6, 6, 7, 7, 4, 4, 6, 7, 7, 7, 7, 2, 2, 2, 2, 2, 2, 4, 2, 6, 3, 5, 3, 4, 2, 3, 3, 2, 4, 2, 2, 2, 6, 2, 4, 2, 8, 6, 2, 2, 4, 2, 4, 7, 2, 7, 2, 2, 2, 2, 2, 4, 2, 2, 2, 7, 4, 2, 6, 6, 4, 4, 5, 2, 2, 7, 5, 3, 4, 6, 2, 2, 6, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 2, 6, 2, 6, 2, 2, 2, 2, 4, 4, 7, 2, 4, 2, 6, 2, 6, 4, 7, 4, 6, 6, 8, 8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 8, 2, 2, 7, 4, 4, 2, 2, 2, 2, 2, 2, 2, 2, 4, 6, 2, 2, 2, 7, 4, 6, 4, 2, 5, 3, 4, 5, 4, 4, 6, 7, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 7, 6, 4, 4, 7, 6, 6, 2, 6, 7, 2, 2, 2, 2, 2, 2, 2, 4, 2, 2, 2, 5, 4, 5, 6, 3, 4, 4, 3, 2, 2, 2, 2, 2, 2, 2, 6, 2, 2, 4, 2, 2, 7, 4, 6, 7, 2, 4, 6, 6, 2, 5, 6, 5, 5, 4, 4, 8, 4, 8, 2, 2, 6, 2, 3, 2, 4, 6, 4, 2, 2, 3, 6, 4, 4, 5, 6, 2, 8, 6, 2, 2, 4, 6, 2, 2, 7, 2, 2, 3, 7, 6, 4, 6, 3, 6, 6,

AnalysisException: 'Resolved attribute(s) End Station#16 missing from Start Station#13,Start Terminal#14 in operator !Project [Start Station#13, Start Terminal#14, End Station#16 AS End Station#735].;;\n!Project [Start Station#13, Start Terminal#14, End Station#16 AS End Station#735]\n+- Project [Start Station#13, Start Terminal#14]\n   +- Relation[Trip ID#10,Duration#11,Start Date#12,Start Station#13,Start Terminal#14,End Date#15,End Station#16,End Terminal#17,Bike ##18,Subscriber Type#19,Zip Code#20] csv\n'