# PySpark SQL

**Brull Borràs, Pere Miquel**

With the extra information given to spark by the interface of SparkSQL about the internal data schema and the computation, Spark can further optimize the data processing. Thus, RDDs evolved into a new type of data structure: *Datasets*, which benefit from the engine provided by SparkSQL. If their information is structured, we will call them *DataFrames*.

### Querying structured data with SparkSQL

DataFrames can be created from external data sources, results of queries or regular RDDs. However, as we just said, they need structured information, i.e an **schema**. What if the data source is encoded in a string or it is the result of parsing a text? One will need to follow these steps:

1. Create an RDD of tuples or lists from the original RDD;
2. Create the schema represented by a StructType matching the structure of tuples or lists specified before.
3. Apply the schema to the RDD via createDataFrame method.

Our schema will have the following fields:

- type
- region
- alc
- m_acid
- ash
- alc_ash
- mgn
- t_phenols
- flav
- nonflav_phenols
- proant
- col
- hue
- od280od315
- proline

In [1]:
from pyspark.sql.types import *
from time import time
import math

sc = SparkContext.getOrCreate()

# Load File
lines = sc.textFile("../input/Wines/wines10m.txt")
wines = lines.map(lambda l: l.split(','))

# Schema String
schemaString = "type region alc m_acid ash alc_ash mgn t_phenols flav nonflav_phenols proant col hue od280od315 proline"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD
wines_df = sqlContext.createDataFrame(wines, schema)

# Creates a temporary view using the DataFrame
wines_df.registerTempTable("wines")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT type FROM wines")

results.show(5)

+------+
|  type|
+------+
|type_1|
|type_3|
|type_2|
|type_1|
|type_2|
+------+
only showing top 5 rows



If we want to get the number of *regions* that have *alc* greater than 11, and grouped by *type*:

In [2]:
# Using direct SQL query:
results = spark.sql("SELECT type, COUNT(region) as count FROM wines WHERE alc > 11 GROUP BY type")
results.show()

+------+-----+
|  type|count|
+------+-----+
|type_3|16334|
|type_2|16449|
|type_1|16520|
+------+-----+



In [3]:
# Using queries as DataFrame operations:
wines_df.select("type","region").filter(wines_df.alc > 11).groupBy("type").count().show()

+------+-----+
|  type|count|
+------+-----+
|type_3|16334|
|type_2|16449|
|type_1|16520|
+------+-----+



For getting descriptive statistics of the column *proline*:

In [4]:
wines_df.describe('proline').show()

+-------+------------------+
|summary|           proline|
+-------+------------------+
|  count|             82278|
|   mean|1051.0387899888356|
| stddev| 547.7868318908558|
|    min|        100.019139|
|    max|        999.996608|
+-------+------------------+



### Combining SparkSQL with RDDs

Given a list of regions:

In [5]:
regions = ["Albania", "Andorra", "Armenia", "Austria", "Azerbaijan",
        "Belarus", "Belgium", "Bosnia and Herzegovina", "Bulgaria", "Croatia", "Cyprus", "Czech Republic",
        "Denmark", "Estonia", "Finland", "France", "Georgia", "Germany", "Greece", "Hungary", "Iceland", "Ireland",
        "Italy", "Kosovo", "Latvia", "Liechtenstein", "Lithuania", "Luxembourg", "Macedonia", "Malta", "Moldova",
        "Monaco", "Montenegro", "Netherlands", "Norway", "Poland", "Portugal", "Romania", "Russia", "San Marino",
        "Serbia", "Slovakia", "Slovenia", "Spain", "Sweden", "Switzerland", "Turkey", "Ukraine", "United Kingdom",
        "Vatican City (Holy See)"]

print('There are {} different region names and {} different regions.'.format(len(regions), 
                                                                             wines_df.select('region').distinct().count()))

There are 50 different region names and 1000 different regions.


Interpret the region ID modulo 50 as the index in the list of names. Then run the query:
- Region name and average hue, grouped by region.

train.groupby('Age').agg({'Purchase': 'mean'}).show()

In [6]:
# zipWithIndex returns a tuple of (item,index)
region_df = sc.parallelize(regions).zipWithIndex().toDF(['region_name','index'])
region_df.show(5)

+-----------+-----+
|region_name|index|
+-----------+-----+
|    Albania|    0|
|    Andorra|    1|
|    Armenia|    2|
|    Austria|    3|
| Azerbaijan|    4|
+-----------+-----+
only showing top 5 rows



In [7]:
# Join DataFrames
wines_names_df = wines_df.join(region_df, wines_df["region"]==region_df["index"]%50, 'inner').drop("index")

wines_names_df.select("region_name", "hue").groupBy("region_name").agg({'hue': 'mean'}).show()

+-----------+------------------+
|region_name|          avg(hue)|
+-----------+------------------+
|     Russia|1.0274380208333334|
|     Sweden|1.0110698878205127|
|     Turkey|1.0150838590163933|
|    Germany|0.9576503299663294|
|     France|1.0293217873754166|
|     Greece|1.0022269190140851|
|     Kosovo|0.9908144545454549|
|   Slovakia|1.0189964965034963|
|    Belgium|1.0121928506016018|
| San Marino|1.0132053802395218|
|    Albania|0.9868996964463652|
|    Finland|  1.00433427090301|
|    Belarus|0.9912486173402886|
|      Malta|0.9995766026490073|
|    Croatia|0.9953677453642413|
|    Andorra| 0.999903643186308|
|      Italy|1.0135391445783137|
|  Lithuania|0.9720043536977491|
|     Norway|0.9961161693290737|
|      Spain|1.0206541597444092|
+-----------+------------------+
only showing top 20 rows



Check SparkSQL documentation in this [link](http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema), which is the information source.