In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd


spark = SparkSession\
    .builder\
    .appName("sparkSqlSession")\
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("employment_date", DateType(), True)
])

df = spark.read.load("sparkSqlData.csv", format="csv", sep=",", schema=schema, header="true")

df.createOrReplaceTempView("df")

dfPd = df.toPandas()
    
df.show(5)

dfPd.head(5)

In [None]:
df.printSchema()

In [None]:
#pyspark.sql
df.describe("salary").show()

#pandas
dfPd.describe()

In [None]:
#pyspark.sql
df.select("first_name", "last_name").show(5)

#sql
spark.sql("SELECT first_name, last_name \
           FROM df").show(5)

#pandas
dfPd.loc[:,["first_name", "last_name"]].head(5)

In [None]:
#pyspark.sql
df.select("first_name", "last_name", "salary").where("salary > 90000").show(5)

#sql
spark.sql("SELECT first_name, last_name, salary \
           FROM df \
           WHERE salary > 90000").show(5)

#pandas
dfPd.loc[dfPd["salary"] > 90000, ["first_name", "last_name", "salary"]].head(5)

## Order by

In [None]:
#pyspark.sql
df.select("first_name", "Last_name", "salary").orderBy(desc("salary")).show(5)
#df.select("first_name", "Last_name", "salary").orderBy(df.salary.desc()).show(5)
#df.select("first_name", "Last_name", "salary").orderBy("salary", ascending=False).show(5)

#sql
spark.sql("SELECT first_name, last_name, salary \
           FROM df \
           ORDER BY salary DESC").show(5)

#pandas
dfPd.loc[:, ["first_name", "last_name", "salary"]].sort_values(by="salary", ascending=False).head(5)

## Regexp

In [None]:
#pyspark.sql
df.filter(df.email.rlike('google*')).show(truncate=False, n=5)

#sql
spark.sql("SELECT * \
           FROM df \
           WHERE email REGEXP 'google*'").show(truncate=False, n=5)

#pandas
dfPd.loc[dfPd.loc[:, "email"].str.contains('google*')].head(5)

In [None]:
#pyspark.sql
df.filter(df.first_name.rlike('^[AB]|[ab]$')).show(5)

#sql
spark.sql("SELECT * \
           FROM df \
           WHERE first_name REGEXP '^[AB]|[ab]$'").show(5)

#pandas
dfPd.loc[dfPd.loc[:, "first_name"].str.contains('^[AB]|[ab]$')].head(5)

## String length

In [None]:
#pyspark.sql
df.select("first_name", length("first_name").alias("first_name_length")) \
  .orderBy(desc("first_name_length"), asc("first_name")).show(5)

#sql
spark.sql("SELECT first_name, length(first_name) AS first_name_length \
           FROM df \
           ORDER BY first_name_length DESC, first_name ASC").show(5)

#pandas
dfPd.assign(first_name_length=dfPd.loc[:, "first_name"].str.len()) \
    .loc[:, ["first_name", "first_name_length"]] \
    .sort_values(by=["first_name_length", "first_name"], ascending=[False, True]).head(5)

In [None]:
#pyspark.sql
df.select("last_name", length("last_name").alias("last_name_length")) \
  .orderBy(asc("last_name_length"), asc("last_name")).show(5)

#sql
shortestLastName = spark.sql("SELECT last_name, length(last_name) AS last_name_length \
                              FROM df \
                              ORDER BY last_name_length ASC, last_name ASC")

shortestLastName.show(5)

shortestLastName = shortestLastName.take(1)[0][0]

print("The shortest last name alphabetically ordered is:", shortestLastName)

#pandas
dfPd.assign(last_name_length=dfPd.loc[:, "last_name"].str.len()) \
    .loc[:, ["last_name", "last_name_length"]] \
    .sort_values(by=["last_name_length", "last_name"], ascending=[True, True]).head(5)

## Date range

In [None]:
#pyspark.sql
df.select("last_name", "employment_date").filter(df["employment_date"].between('2018-12-01', '2018-12-20')).show(5)

#sql
spark.sql("SELECT last_name, employment_date \
           FROM df \
           WHERE employment_date BETWEEN '2018-12-01' AND '2018-12-20'").show(5)

#pandas
dfPd["employment_date"] = pd.to_datetime(dfPd["employment_date"])

dfPd.loc[dfPd["employment_date"].between('2018-12-01', '2018-12-20'), ["last_name", "employment_date"]].head(5)

## Normalization

In [None]:
#pyspark.sql
df.select(abs(df["salary"]/(df.select(sum("salary")).take(1)[0][0])).alias("salary_normalized")).show(5)

#sql
spark.sql("SELECT salary/(SELECT sum(salary) FROM df) AS salary_normalized \
           FROM df").show(5)

#pandas
dfPd.assign(salary_normalized=dfPd["salary"]/dfPd["salary"].sum()).loc[:, "salary_normalized"].head(5)

## Join

In [None]:
dfJoin = spark.read.load("sparkSqlData2.csv", format="csv", sep=",", inferSchema=True, header="true")

dfJoin.createOrReplaceTempView("dfJoin")

dfJoinPd = dfJoin.toPandas()

In [None]:
#pyspark.sql
df.join(dfJoin, on="id").show(5)

#sql
spark.sql("SELECT * FROM df JOIN dfJoin USING (id)").show(5)

#pandas
dfPd.merge(dfJoinPd, on="id")

## Second hightest salary

In [None]:
#pyspark.sql
df.filter(~df.salary.isin(df.select(max("salary")).take(1)[0][0])).select(max("salary") \
  .alias("secondHighestSalary")).show(5)

#sql
spark.sql("SELECT max(salary) AS secondHighestSalary FROM df WHERE salary != (SELECT max(salary) FROM df)").show(5)

#pandas
dfPd[~dfPd["salary"].isin([dfPd["salary"].max()])].loc[:, "salary"].max()
#dfPd.where(dfPd["salary"] != dfPd["salary"].max()).loc[:, "salary"].max()

## Nth highest salary

In [None]:
from pyspark.sql import Window
#pyspark.sql
df.select("salary", dense_rank().over(Window.orderBy(desc("salary"))).alias("rank")) \
  .select("salary").where("rank = 5").show()

#sql
spark.sql("SELECT salary \
           FROM (SELECT salary, DENSE_RANK() OVER (ORDER BY salary DESC) AS rank FROM df) dfTemp \
           WHERE dfTemp.rank = 5").show(5)
#pandas
dfTemp = dfPd.assign(rank = dfPd["salary"].rank(method='dense',ascending=False))
dfTemp.loc[dfTemp["rank"]==5, "salary"]

## Elapsed dates

In [None]:
#pyspark.sql
df.select("last_name", "employment_date", datediff(current_timestamp(), "employment_date") \
          .alias("elapsedDatesSinceHired")).orderBy(desc("employment_date")) \
          .filter(datediff(current_timestamp(), "employment_date").between(1, 500)).show()
#sql
spark.sql("SELECT last_name, employment_date, DATEDIFF(current_date(), employment_date) AS elapsedDatesSinceHired \
           FROM df WHERE (DATEDIFF(current_date(), employment_date)) BETWEEN 1 AND 500 \
           ORDER BY employment_date DESC").show()
#pandas
dfPd.assign(elapsedDatesSinceHired = ((pd.Timestamp.now() - dfPd["employment_date"])).dt.days) \
    .loc[((pd.Timestamp.now() - dfPd["employment_date"])).dt.days.between(1, 500) \
         ,["last_name", "employment_date", "elapsedDatesSinceHired"]] \
    .sort_values(by="employment_date", ascending=False)