## Spark SQL

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import random

#### RDD VS Data Frame

In [0]:
people_data = [("Brooke", 20), ("Denny", 31), ("Jules", 30),
("TD", 35), ("Brooke", 25), ("TD", 60), ("Brooke", 30)]

##### RDD

In [0]:
dataRDD = sc.parallelize(people_data)

agesRDD = (dataRDD.map(lambda x: (x[0], (x[1], 1))) 
           .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) 
           .map(lambda x: (x[0], x[1][0]/x[1][1])))
agesRDD.collect()

##### DataFrame

In [0]:
data_df = spark.createDataFrame(people_data, ["name", "age"])
avg_df = data_df.groupBy("name").agg(avg("age"))
avg_df.show()

### Creating DataFrame

#### From RDD

In [0]:
some_rdd = sc.parallelize(range(1,10)).map(lambda x : (x, random.randint(0,100)))

In [0]:
some_rdd.collect()

In [0]:
kv_DF = some_rdd.toDF(["key", "value"])
kv_DF.printSchema()

In [0]:
kv_DF.show()

#### From collection

In [0]:
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
           [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
    "LinkedIn"]],
           [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
    "twitter", "FB", "LinkedIn"]],
           [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
    ["twitter", "FB"]],
           [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
    "twitter", "FB", "LinkedIn"]],
           [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
    ["twitter", "LinkedIn"]]
          ]



#### Defining a schema

##### Schema Created Programmatically

In [0]:

schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("First", StringType(), False),
  StructField("Last", StringType(), False),
  StructField("Url", StringType(), True),
  StructField("Published", StringType(), False),
  StructField("Hits", IntegerType(), False),
  StructField("Campaigns", ArrayType(StringType(), True), False)
])

##### Spark Data Types
* BooleanType
* ByteType
* IntegerType
* LongType
* FloatType
* DoubleType
* DecimalType
* StringType
* BinaryType
* TimestampType
* DateType
* ArrayType
* MapType
* StructType
* StructField

In [0]:
blogs_df = spark.createDataFrame(data, schema)
blogs_df.show()
blogs_df.printSchema()

In [0]:
blogs_df.columns

In [0]:
blogs_df.schema

#### Creating DataFranes from Data Sources

* text
* json
* csv
* jdbc
* orc
* parquet

`DataFrameReader`

##### Create a DataFrame from text file

In [0]:
%fs head /databricks-datasets/README.md

In [0]:
text_file = spark.read.text("dbfs:/databricks-datasets/README.md")
text_file.show(5, truncate=False)

##### Create a DataFrame from the NYT COVID data CSV file

In [0]:
%fs head /databricks-datasets/COVID/covid-19-data/us-counties.csv

In [0]:
cov_df = spark.read.csv("/databricks-datasets/COVID/covid-19-data/us-counties.csv")
cov_df.show(5)
cov_df.printSchema()

In [0]:
cov_df_ = spark.read.csv("/databricks-datasets/COVID/covid-19-data/us-counties.csv", header=True, inferSchema=True)
cov_df_.show()

In [0]:
cov_df_.printSchema()

In [0]:
cov_schema = StructType([
  StructField("Date",DateType(),True),
  StructField("county",StringType(),True),
  StructField("state",StringType(),True),
  StructField("fips",IntegerType(),True),
  StructField("cases",IntegerType(),True),
  StructField("deaths",IntegerType(),True)
])

In [0]:
cov_df = spark.read.csv("/databricks-datasets/COVID/covid-19-data/us-counties.csv", header=True, schema=cov_schema)
cov_df.show()

In [0]:
cov_df.printSchema()

##### Create a DataFrame from JSON file

In [0]:
%fs head /databricks-datasets/samples/people/people.json

In [0]:
people_df = spark.read.json("dbfs:/databricks-datasets/samples/people/people.json")
people_df.show()

### DataFrames Operations

#### Transformations

* select
* selectExpr
* filter 
* where
* distinct
* dropDuplicates
* sort
* orderBy
* limit
* union
* withColumn
* withColumnRenamed
* drop
* sample
* join
* groupBy
* describe

#### Action

* show([numRows], [[truncate]])
* head([n])
* first([n])
* take(n)
* takeAsList(n)
* collect()
* count()

### Working with DataFrames

In [0]:
cov_df.show()

#### Load data

In [0]:
%fs ls /databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/

path,name,size
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/.gitignore,.gitignore,9
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-01-2021.csv,01-01-2021.csv,566153
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-02-2021.csv,01-02-2021.csv,566475
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-03-2021.csv,01-03-2021.csv,566537
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-04-2021.csv,01-04-2021.csv,566578
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-05-2021.csv,01-05-2021.csv,566788
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-06-2021.csv,01-06-2021.csv,566804
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-07-2021.csv,01-07-2021.csv,566852
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-08-2021.csv,01-08-2021.csv,567013
dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-09-2021.csv,01-09-2021.csv,566974


In [0]:
%fs head /databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-31-2021.csv


In [0]:
covid_schema = StructType([
  StructField("FIPS",IntegerType(),True),
  StructField("Admin2",StringType(),True),
  StructField("Province_State",StringType(),True),
  StructField("Country_Region",StringType(),True),
  StructField("Last_Update",TimestampType(),True),
  StructField("Lat", DoubleType(),True),
  StructField("Long_", DoubleType(),True),
  StructField("Confirmed",LongType(),True),
  StructField("Deaths",LongType(),True),
  StructField("Recovered",LongType(),True),
  StructField("Active",LongType(),True),
  StructField("CombinedKey",StringType(),True),
  StructField("Incident_Ratet",DoubleType(),True),
  StructField("Case_Fatality_Ratio",DoubleType(),True) 
])

In [0]:
#global_data = spark.read.csv("dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/01-30-2021.csv", inferSchema=True, header=True)
global_data = spark.read.csv("dbfs:/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/*.csv", schema=covid_schema, header=True)

##### Show our data

###### Covid DataFrame

In [0]:
global_data.head()

In [0]:
global_data.take(3)

In [0]:
global_data.show()

##### Make our data more easy to work with

In [0]:
global_data = global_data.withColumn("date", to_date(col("Last_Update")))
global_data.show()

In [0]:
global_data.where(col("date").isNull()).count()

In [0]:
global_data = global_data.na.drop(subset=["Confirmed", "Active", "date"])

In [0]:
global_data = global_data.drop("Lat", "Long_", "CombinedKey", "Last_Update" )
global_data.show()

In [0]:
global_data = global_data.withColumnRenamed("Admin2", "county")
global_data.show()

In [0]:
global_data = global_data.filter(column("Active") >= 0)

In [0]:
global_data.persist()

##### Counting number of rows

In [0]:
global_data.count()

In [0]:
global_data.sample(0.00003).show()

##### Simple Select

In [0]:
global_data.select("Country_Region", "Confirmed").show(5)
global_data.select(col("Country_Region"), col("Confirmed")).show(5)
global_data.select(column("Country_Region"), column("Confirmed")).show(5)
global_data.select(global_data["Country_Region"], global_data["Confirmed"]).show(5)


In [0]:
global_data.select("date", col("Confirmed") + 1).show(5)
#global_data.select("date", "Confirmed" + 1).show(5)

In [0]:
global_data.select("Country_Region","date", "Confirmed")\
        .withColumn("grater than 1000", (col("confirmed") > 1000)).show(100)
#global_data.select("date", "Confirmed").withColumn("grater than 1000", ("Confirmed" > 1000)).show(5)

##### Get the oldest informations

In [0]:
#global_data.orderBy(global_data["Date"]).show()
global_data.sort(global_data["date"]).show()

##### Get informations about the countries

In [0]:
#global_data.select("Country_Region").distinct().count()
global_data.select(col("Country_Region")).dropDuplicates(["Country_Region"]).show(193)


##### Get the informations from Poland

In [0]:
global_data.filter(global_data["Country_Region"] == "Poland").show()
#global_data.where(cglobal_data["Country_Region"] == "Poland").show()

In [0]:
new_df = global_data.select("*").sort(global_data["date"].desc()).filter(global_data["Country_Region"] == "Poland").select("Country_Region", "date", "Confirmed", "Active").filter(year(col("date")) == 2020)
new_df.show()


In [0]:
new_df.explain(True)

In [0]:
global_data.select("Country_Region", "Confirmed", "Active", "date").filter((col("Country_Region") == "Poland") & (col("Active") >=  300000)).show()

##### Get days with the most active cases in Poland

In [0]:
global_data.filter((col("Country_Region") == "Poland")).select("date","Active").orderBy(col("Active").desc()).limit(5).show()

#### Perfrom some aggregations

##### Common Aggragation Functions
* count(col)
* countDistinct(col)
* min(col)
* max(col)
* sum(col)
* sumDistinct(col)
* avg(col)

In [0]:
global_data.select(countDistinct("Country_Region")).show()

In [0]:
global_data.select(max(col("Confirmed"))).show()

In [0]:
#global_data.groupBy("Country_Region").show()
global_data.groupBy("Country_Region").agg(count("*")).show()

In [0]:
num_of_provinces = global_data.groupBy("Country_Region").agg(countDistinct("Province_State"))
num_of_provinces.show()
num_of_provinces.where(col("Country_Region") == "Poland").show()

##### Get cases for each Country

In [0]:
last_data = global_data.where(col("date") == "2021-01-31")

In [0]:
global_data_country = last_data.groupBy("Country_Region").agg(sum(col("Confirmed")).alias("Confirmed"), 
                                        sum(col("Active")).alias("Active"), 
                                        sum(col("Deaths")).alias("Deaths"), 
               )
gobal_data_country.orderBy(col("Active").desc()).show()

In [0]:
global_data_country.orderBy(col("Confirmed").desc()).show()

In [0]:
global_data_country = global_data.groupBy("Country_Region", "date").agg(sum(col("Confirmed")).alias("Confirmed"), 
                                        sum(col("Active")).alias("Active"), sum(col("Deaths")).alias("Deaths"))

In [0]:
global_data_country.show()

In [0]:
global_data_country.groupBy("Country_Region").agg(max(col("Confirmed")), max(col("Active"))).show()

In [0]:
global_data_country.groupBy("Country_Region").agg(avg(col("Active"))).show()

## Using SQL in Spark SQL

In [0]:
infoDF = spark.sql("select current_date() as today , 1 + 100 as value")
infoDF.show()


In [0]:
global_data.createOrReplaceTempView("covid")

In [0]:
spark.sql("SELECT * FROM covid").show()

In [0]:
most_cases = spark.sql("SELECT max(Confirmed) AS max_cases, max(Deaths) AS max_deaths, Country_Region\
          FROM covid\
          GROUP BY Country_Region\
          ORDER BY max_cases DESC\
          LIMIT 10"
         )
most_cases.show()




In [0]:
spark.sql("SELECT Country_Region, max(Confirmed) as maxConfirmed \
            FROM covid \
            GROUP BY Country_Region").agg(sum(col("maxConfirmed"))).show()



In [0]:
most_cases.write.format("json").mode("overwrite").save("/data/output/somejson")