In [2]:
import pyspark
#Importing the Libraries
from pyspark.sql import SparkSession

In [3]:
#Creating Spark Session
spark = SparkSession.builder.master("local[1]").appName("PySparkPractice").getOrCreate()

In [None]:
#Reading csv files with PySpark
df = spark.read.csv('datacamp_ecommerce.csv',header=True,escape="\"")

In [None]:
#Checking the Imported Dataset
df.show()

In [None]:
#Creating a New Column
df = df.withColumn('Weight in Kg', df.Weight/1000)

In [None]:
#Renaming the Column
df = df.withColumnRenamed("Weight in Kg", "Weight in Kilograms")

In [None]:
#selecting the column
df.select(df.Weight, df['Weight in Kilograms']).show()

In [None]:
#Creating Column Column Alias
df.select(df['Weight in Kilograms'].alias("Kilograms")).show()

In [None]:
#different ways of type conversion of pyspark
df2 = df.withColumn("age",col("age").cast(StringType())) \
    .withColumn("isGraduated",col("isGraduated").cast(BooleanType())) \
    .withColumn("jobStartDate",col("jobStartDate").cast(DateType()))
df2.printSchema()

df3 = df2.selectExpr("cast(age as int) age",
    "cast(isGraduated as string) isGraduated",
    "cast(jobStartDate as string) jobStartDate")
df3.printSchema()
df3.show(truncate=False)

df3.createOrReplaceTempView("CastExample")
df4 = spark.sql("SELECT STRING(age),BOOLEAN(isGraduated),DATE(jobStartDate) from CastExample")
df4.printSchema()
df4.show(truncate=False)


In [None]:
#Performing arithmetic operations

data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")

df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show() 
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()

In [None]:
#Grouping and sorting data
from pyspark.sql.functions import sum, col, desc
df.groupBy("state") \
  .agg(sum("salary").alias("sum_salary")) \
  .filter(col("sum_salary") > 100000)  \
  .sort(desc("sum_salary")) \
  .show()
# Sory by on group by column
from pyspark.sql.functions import asc
dfFilter.sort("sum_salary").show()
# Sort by descending order.
from pyspark.sql.functions import desc
dfFilter.sort(desc("sum_salary")).show()

In [None]:
#Aggregate Functions

print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

#collect_list - with duplicates
df.select(collect_list("salary")).show(truncate=False)


#collect_set - without duplicates
df.select(collect_set("salary")).show(truncate=False)

#countDistinct
df2 = df.select(countDistinct("department", "salary"))

#count
print("count: "+str(df.select(count("salary")).collect()[0]))

#first
df.select(first("salary")).show(truncate=False)

#last
df.select(last("salary")).show(truncate=False)

df.select(kurtosis("salary")).show(truncate=False)

df.select(max("salary")).show(truncate=False)

df.select(min("salary")).show(truncate=False)

df.select(mean("salary")).show(truncate=False)

df.select(skewness("salary")).show(truncate=False)

df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)

df.select(sum("salary")).show(truncate=False)

df.select(sumDistinct("salary")).show(truncate=False)

df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

In [None]:
#Data Pre-Processing with PySpark
#drop null values
df.na.drop("any").show()

# Using dropDuplicates on multiple columns
dropDisDF = df.dropDuplicates(["department","salary"])


# Using dropDuplicates on single column
dropDisDF = df.dropDuplicates(["salary"]).select("salary")

In [None]:
#Working with datetime values
#current_date()
df.select(current_date().alias("current_date")
  ).show(1)
#date_format()
df.select(col("input"), 
    date_format(col("input"), "MM-dd-yyyy").alias("date_format") 
  ).show()
#to_date()
df.select(col("input"), 
    to_date(col("input"), "yyy-MM-dd").alias("to_date") 
  ).show()
#datediff()
df.select(col("input"), 
    datediff(current_date(),col("input")).alias("datediff")  
  ).show()
#months_between()
df.select(col("input"), 
    months_between(current_date(),col("input")).alias("months_between")  
  ).show()
df3.select(col("input"), 
    hour(col("input")).alias("hour"), 
    minute(col("input")).alias("minute"),
    second(col("input")).alias("second") 
  ).show(truncate=False)


In [None]:
#window functions : ranking, aggregate functions
#aggregate function
windowSpecAgg  = Window.partitionBy("department")
from pyspark.sql.functions import col,avg,sum,min,max,row_number 
df.withColumn("row",row_number().over(windowSpec)) \
  .withColumn("avg", avg(col("salary")).over(windowSpecAgg)) \
  .withColumn("sum", sum(col("salary")).over(windowSpecAgg)) \
  .withColumn("min", min(col("salary")).over(windowSpecAgg)) \
  .withColumn("max", max(col("salary")).over(windowSpecAgg)) \
  .where(col("row")==1).select("department","avg","sum","min","max") \
  .show()

In [None]:
#Join Two DataFrames

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

  empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left")
    .show(truncate=False)
  empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter")
    .show(truncate=False)
    
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti") \
   .show(truncate=False)

empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

#Join on multiple DataFrames
df1.join(df2,df1.id1 == df2.id2,"inner") \
   .join(df3,df1.id1 == df3.id3,"inner")

In [None]:
#Exploratory Data Analysis (EDA) using Pyspark

#dataframe schema
sales_df.printSchema()

#display list of columns
sales_df.columns

#filter condition with selective columns
country_df.select(‘COUNTRY_ID’,
 ‘COUNTRY_ISO_CODE’,
 ‘COUNTRY_NAME’,).filter(country_df.COUNTRY_NAME==’India’).show()

#GroupBy and Aggregation
cust_wise_df=sale_sum_df.groupBy(round(‘CUST_ID’,0).alias(‘CUST_ID’), 
                                 year(sale_sum_df[‘TIME_ID’]).alias(‘YEAR’)).sum(‘AMOUNT_SOLD’)

#Data Sorting
cust_wise_df.orderBy(cust_wise_df.CUST_ID).show(15)
cust_wise_df.filter(cust_wise_df.CUST_ID==3261).show()

#Data Insights
#find out which channel contributed most to the sales
c_df=chan_df.select(col(‘CHANNEL_ID’).alias(‘CHANNEL_ID_C’),col(‘CHANNEL_DESC’).alias(‘CHANNEL_NAME’))
sa_df=sales_df.select(col(‘CHANNEL_ID’).alias(‘CHANNEL_ID_S’),’AMOUNT_SOLD’)
chan_sales_df=sa_df.join(c_df,c_df.CHANNEL_ID_C==sa_df.CHANNEL_ID_S,how=’inner’)
chan_sale=chan_sales_df.groupBy(round(‘CHANNEL_ID_C’,0).alias(‘CHANNEL_ID’)).sum(‘AMOUNT_SOLD’)
chan_top_sales=chan_sale.withColumnRenamed(‘sum(AMOUNT_SOLD)’,’TOT_AMOUNT’)
chan_top_sales.orderBy(‘CHANNEL_ID’).show()

In [24]:
list=[1,2,3,4,5]
print(list[::-1])

[5, 4, 3, 2, 1]


In [33]:
dict1={1:"one",2:"two",3:"three"}
print(dict1)
val=2
if val in dict1.keys():
    print(dict1[val])
else:
    print("does not exist")

{1: 'one', 2: 'two', 3: 'three'}
two


In [35]:
x=lambda i:i*i
print(x(3))

9
