In [1]:
import pyspark
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import lit, desc, col, size, array_contains, isnan, udf, hour, array_min, array_max, countDistinct

from pyspark.ml  import Pipeline     
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit

In [None]:
"""
We need to create a spark container by calling SparkSession. 
This step is necessary before doing anything
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.master("local[1]").appName("Example01").getOrCreate() 


In [None]:
"""
Due to parallel execution on all cores on multiple machines, 
PySpark runs operations faster then pandas. 
In other words, pandas DataFrames run operations on a single node 
whereas PySpark runs on multiple machines. 
"""

data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)


In [None]:
df.printSchema()

In [None]:
df.show()

In [None]:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show()


In [None]:

simpleData = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)


In [None]:
#use of groupBy function in PySpark
#get the sum
df.groupBy("state").sum("salary").show(truncate=False)

#get the mean
df.groupBy("state").mean("salary").show(truncate=False)

# check with groupby of departments and the data statistics with bonus

In [None]:

df.groupBy("department").count().show()


In [None]:

df.groupBy("department").min("salary").show()

df.groupBy("department").mean( "salary").show()
df.groupBy("department").mean( "salary").show(truncate=False)

#replace dept by state and salary by bonus

In [None]:

#GroupBy on multiple columns
df.groupBy("department","state").sum("salary","bonus").show()


In [None]:
df.groupBy("department","state").sum("salary").show()
df.groupBy("department","state").mean("bonus").show()


In [None]:

from pyspark.sql.functions import sum,avg,max
df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)


In [None]:
df.groupBy("department","state") \
    .agg(sum("salary").alias("sum_salary"), \
         avg("salary").alias("avg_salary"), \
         sum("bonus").alias("sum_bonus"), \
         max("bonus").alias("max_bonus") \
     ) \
    .show(truncate=False)

In [None]:

df.groupBy("department") \
    .agg(sum("salary").alias("sum_salary"), \
      avg("salary").alias("avg_salary"), \
      sum("bonus").alias("sum_bonus"), \
      max("bonus").alias("max_bonus")) \
    .where(col("sum_bonus") >= 50000) \
    .show(truncate=False)


In [None]:
file1 = '../../../Lectures/BMS/DOM305/csv_for_glob/Tab_7.3.1_outlay_expenditure.csv'
df1 = spark.read.csv(str(file1),header=True)
df1.show()

In [None]:
num_rows = df1.count()
print(num_rows)

In [None]:
#show statistic of the data we want
df1.describe('Social Services').show()

#convert spark dataframe to pandas dataframe
dfpandas = df1.toPandas()

In [None]:
df1.groupBy("State/UTs") \
    .agg(sum("Social Services").alias("sum_SS"), \
         avg("Social Services").alias("avg_SS"), \
         sum("Energy").alias("sum_Energy"), \
         max("Energy").alias("max_Energy") \
     ) \
    .show(truncate=False)

In [None]:
file2 = '../../../Lectures/BMS/DOM305/csv_for_glob/'
df2 = spark.read.csv(str(file2),header=True)
df2.show()
df2.count()

In [None]:
dfpandas2 = df2.toPandas()
dfpandas2

In [None]:
df2.groupBy("State/UTs").agg(sum("Social Services").alias("sum_SS"), \
         avg("Social Services").alias("avg_SS"), \
         sum("Energy").alias("sum_Energy"), \
         max("Energy").alias("max_Energy") \
     ) \
    .show(truncate=False)