# PySpark DataFrames and SQL

[Jian Tao](https://orcid.org/0000-0003-4228-6089), Texas A&M University

May 1, 2021

### 1. Set up the PySpark environment first

In [None]:
# For each Google Colab, we will need to run this cell to ensure that PySpark is installed properly.
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Colab").config('spark.ui.port', '4050').getOrCreate()

# !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# !unzip -o ngrok-stable-linux-amd64.zip
# get_ipython().system_raw('./ngrok http 4050 &')
# !curl -s http://localhost:4040/api/tunnels | python3 -c "import sys, json; print(\"\nClick me to launch (give it a minute or two)\n\"); print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

### 2. Create a DataFrame by reading from a CSV/JSON file

`spark.read.csv` can only read from local files, so we will have to download the CSV file from the URL first. We can use `SparkFiles` to do that or use `pandas`. For those CSV files with a header, please make sure to set `header=True` in the argument list for `spark.read.csv`. When the data types of the columns are not known, `inferSchema=True` will do the trick to automatically recognize the data types, but it is not perfect. In our example, `Horsepower` is not correctly recognized.

In [None]:
from pyspark import SparkFiles

csv_url = "https://raw.githubusercontent.com/jtao/AdvancedML/main/data/Auto.csv"
json_url = "https://raw.githubusercontent.com/jtao/dswebinar/master/pyspark/Auto.json"

spark.sparkContext.addFile(csv_url)
spark.sparkContext.addFile(json_url)

## One can create a spark dataframe from pandas dataframe as well.
# import pandas as pd
# df = spark.createDataFrame(pd.read_csv(url))

#df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", inferSchema=False)
df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", inferSchema=True)

df.printSchema()
df.show(5)

In [None]:
# either will work
df = spark.read.json(SparkFiles.get("Auto.json"))
#df = spark.read.load(SparkFiles.get("Auto.json"),format="json")

df.printSchema()
df.show(10)

We can define a schema to help `spark.read.csv` to correctly cast the type of all the columns.

In [None]:
from pyspark.sql.types import *

user_schema = StructType([
                     StructField("mpg", IntegerType(), True),
                     StructField("cylinders", IntegerType(), True),
                     StructField("displacement", IntegerType(), True),
                     StructField("horsepower", IntegerType(), True),
                     StructField("weight", IntegerType(), True),
                     StructField("acceleration", DoubleType(), True),
                     StructField("year", IntegerType(), True),
                     StructField("origin", IntegerType(), True),
                     StructField("name", StringType(), True)
])

df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", schema=user_schema, inferSchema=True)

df.printSchema()
df.show(5)

In [None]:
df = spark.read.json(SparkFiles.get("Auto.json"))
df.printSchema()
df.show()

### 3. Create a Spark DataFrame with a list

In [None]:
auto_list = [(1, 18, "Chevrolet"), (2, 15, "Buick"), (3, 18, "Plymouth"), (4, 16, "Amc"), (5, 17, "Ford")]

df = spark.createDataFrame(auto_list)
df.printSchema()
df.show(5)

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

users_schema = StructType([
                          StructField("id", IntegerType(), True),
                          StructField("mpg", IntegerType(), True),
                          StructField("name", StringType(), True)])

df = spark.createDataFrame(auto_list, schema=users_schema)
df.printSchema()
df.show(5)

### 3. Create a Spark DataFrame with a list of dictionaries

In [None]:
auto_list = [{"id": 1, "mpg": 18, "name": "Chevrolet"}, 
                {"id": 2, "mpg": 15, "name": "Buick"}, 
                {"id": 3, "mpg": 18, "name": "Plymouth"}, 
                {"id": 4, "mpg": 16, "name": "Amc"}, 
                {"id": 5, "mpg": 17, "name": "Ford"}]
df = spark.createDataFrame(auto_list)
df.printSchema()
df.show(5)

### 4. Operations on Spark DataFrame

In [None]:
# Load the full data set again.
df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", inferSchema=True)

In [None]:
# Select only the "name" column
df.select("name").show(5)

In [None]:
# Select everybody, but increment the mpg by 100
df.select(df['name'], df['mpg'] + 100).show(5)

In [None]:
# Select mpg greater than 30
df.filter(df['mpg'] > 30).show(5)

In [None]:
# Count Cars by cylinders
df.groupBy("cylinders").count().show(10)

### 5. Running SQL queries programmatically

In [None]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("auto")
sqlDF = spark.sql("SELECT * FROM auto")
sqlDF.show(5)

In [None]:
# Register the DataFrame as a global temporary view
df.createGlobalTempView("auto")

In [None]:
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.auto").show(5)

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.auto").show(5)

With spark, one can run SQL queries directly on files.

In [None]:
df = spark.sql("SELECT * FROM json.`Auto.json`")

In [None]:
df.show()

### 6. References:

SQL References
https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html