## Spark DataFrames


### [Spark SQL](https://spark.apache.org/sql/)
Spark ecosystem consists of Spark Core, the main data processing framework, and first-party components including Spark SQL, Spark MLlib, Spark ML, Spark Streaming, and GraphX.

Spark SQL is Apache Spark's module for working with structured data, any data with a defined set of fields for each record. The structure i.e. schema allows Spark to store data more efficiently than using java serialization and run SQL queries on data. Spark SQL has the following featues:
1. *Integrated*: Seamlessly mix SQL queries with Spark programs
2. *Uniform Data Access*: Connect to any data source the same way (Hive, Avro, Parquet, ORC, JSON, JDBC, CSV)
3. *Hive Integration*: Run SQL or HiveQL queries on existing warehouses
4. *Standard Connectivity*: Connect through industry standard JDBC and ODBC
5. *Performance*: Spark SQL includes a cost-based optimizer, columnar storage and code generation to make queries fast. At the same time, it scales to thousands of nodes and multi hour queries using the Spark engine, which provides full mid-query fault tolerance.


### Spark DataSets and DataFrames


A **Dataset** is a distributed collection of data that includes additional metadata about the structure (schema) of the data being stored. Datasets provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine, allowing Spark to manage the schema in a much more efficienty way than using Java serialization. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API because of its dynamic typing nature (There is no declaration of a variable, just an assignment statement). But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A **DataFrame** is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. 

Starting Spark 2.0, Dataset APIs , Dataset and DataFrame APIs were merged. Dataset takes on two distinct APIs characteristics: a strongly-typed API and an untyped API. DataFrame can be considered as the untyped view of a Dataset, which is a Dataset of Row where a Row is a generic untyped JVM object. In contrast, Dataset is a collection of strongly-typed JVM objects.

Spark DataFrames are heavily inspired by Pandas and we're actually able to create **Pandas user-defined functions (UDFs)** to use with Spark which leverage the Apache Arrow project to *vectorized computation* instead of *row-by-row operations*. This can lead to significant performance boosts for large datasets. Vectorization refers to processing  entire sets of elements rather than their individual elements. 
[Further Reading:RealPython Vectorization](https://realpython.com/numpy-array-programming/)

SQL Tables and Views are basically the same thing as DataFrames. We simply just execute SQL against them instead of DataFrame code. You can choose to express some of your data manipulations in SQL and others in DataFrames and they will compile to the same underlying code.


### [pyspark.sql documentation ](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#)

## Spark SQL Demo

### [Spark Session](https://sparkour.urizone.net/recipes/understanding-sparksession/)
When working with RDD, we used SparkContext. 
Starting Spark 2.0, SparkSession provides a single entry point to perform many operations that were previously scattered across multiple classes.
A SparkSession can be used to create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read files.

In [None]:
from pyspark.sql import SparkSession # Spark Session Class for create a Spark Session

# create a spark session object
# following some builder pattern

spark = SparkSession.builder \
                    .master("local") \
                    .appName("FIFAPlayers") \
                    .getOrCreate() 

dfReader = spark.read # Spark interface to read external reader

df = dfReader.option("header", "true") \
           .option("inferSchema", value = True) \
           .csv("fifadata.csv")

# SQL database in memory (distributed potentiallly on a cluster)
df.printSchema()



root
 |-- _c0: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Photo: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- Flag: string (nullable = true)
 |-- Overall: integer (nullable = true)
 |-- Potential: integer (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club Logo: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Wage: string (nullable = true)
 |-- Special: integer (nullable = true)
 |-- Preferred Foot: string (nullable = true)
 |-- International Reputation: integer (nullable = true)
 |-- Weak Foot: integer (nullable = true)
 |-- Skill Moves: integer (nullable = true)
 |-- Work Rate: string (nullable = true)
 |-- Body Type: string (nullable = true)
 |-- Real Face: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Jersey Number: integer (nullable = true)
 |-- Joined: string (nullable = true)
 |-- Loaned From: string (nu

In [None]:
dfWithSomeColumns = df.select("Name", "Nationality", "Wage")

In [None]:
print("------ Print some columns from the table ")
dfWithSomeColumns.show() # show method has two parameters numRows: Int, truncate: Boolean

------ Print some columns from the table 
+-----------------+-----------+-----+
|             Name|Nationality| Wage|
+-----------------+-----------+-----+
|         L. Messi|  Argentina|€565K|
|Cristiano Ronaldo|   Portugal|€405K|
|        Neymar Jr|     Brazil|€290K|
|           De Gea|      Spain|€260K|
|     K. De Bruyne|    Belgium|€355K|
|        E. Hazard|    Belgium|€340K|
|        L. Modrić|    Croatia|€420K|
|        L. Suárez|    Uruguay|€455K|
|     Sergio Ramos|      Spain|€380K|
|         J. Oblak|   Slovenia| €94K|
|   R. Lewandowski|     Poland|€205K|
|         T. Kroos|    Germany|€355K|
|         D. Godín|    Uruguay|€125K|
|      David Silva|      Spain|€285K|
|         N. Kanté|     France|€225K|
|        P. Dybala|  Argentina|€205K|
|          H. Kane|    England|€205K|
|     A. Griezmann|     France|€145K|
|    M. ter Stegen|    Germany|€240K|
|      T. Courtois|    Belgium|€240K|
+-----------------+-----------+-----+
only showing top 20 rows



In [None]:
print("--- Print a few players from Brazil-----")
dfWithSomeColumns.filter(dfWithSomeColumns['Nationality']=='Brazil').show()

--- Print a few players from Brazil-----
+---------------+-----------+-----+
|           Name|Nationality| Wage|
+---------------+-----------+-----+
|      Neymar Jr|     Brazil|€290K|
|       Casemiro|     Brazil|€285K|
|       Coutinho|     Brazil|€340K|
|        Marcelo|     Brazil|€285K|
|   Thiago Silva|     Brazil|€165K|
|        Ederson|     Brazil|€125K|
|Roberto Firmino|     Brazil|€195K|
|    Alex Sandro|     Brazil|€160K|
|  Douglas Costa|     Brazil|€175K|
|    Fernandinho|     Brazil|€185K|
|        Alisson|     Brazil|€115K|
|       Paulinho|     Brazil|€235K|
|          Naldo|     Brazil| €38K|
|        Miranda|     Brazil| €96K|
|    Filipe Luís|     Brazil| €81K|
|    Alex Telles|     Brazil| €22K|
|        Fabinho|     Brazil|€120K|
|     Marquinhos|     Brazil| €90K|
|        Willian|     Brazil|€175K|
|          Jonas|     Brazil| €25K|
+---------------+-----------+-----+
only showing top 20 rows



In [None]:
print("---Print the count of players per country----")
groupedData = dfWithSomeColumns.groupBy("Nationality")
groupedData.count().show()
groupedData.count().orderBy("count", ascending = False).show()

---Print the count of players per country----
+--------------+-----+
|   Nationality|count|
+--------------+-----+
|          Chad|    2|
|        Russia|   79|
|      Paraguay|   85|
|       Senegal|  130|
|        Sweden|  397|
|        Guyana|    3|
|   Philippines|    2|
|       Eritrea|    2|
|          Fiji|    1|
|        Turkey|  303|
|          Iraq|    7|
|       Germany| 1198|
|St Kitts Nevis|    3|
|       Comoros|    6|
|   Afghanistan|    4|
|   Ivory Coast|  100|
|        Jordan|    1|
|        Rwanda|    1|
|         Sudan|    3|
|        France|  914|
+--------------+-----+
only showing top 20 rows

+-------------------+-----+
|        Nationality|count|
+-------------------+-----+
|            England| 1662|
|            Germany| 1198|
|              Spain| 1072|
|          Argentina|  937|
|             France|  914|
|             Brazil|  827|
|              Italy|  702|
|           Colombia|  618|
|              Japan|  478|
|        Netherlands|  453|
|           

In [None]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("fifa")

sqlDF = spark.sql("SELECT Name, Nationality, Wage FROM fifa ORDER BY Name")
sqlDF.show(5)

+-------------+-----------+----+
|         Name|Nationality|Wage|
+-------------+-----------+----+
|     A. Abang|   Cameroon| €1K|
|A. Abdellaoui|    Algeria| €4K|
| A. Abdennour|    Tunisia|€24K|
|      A. Abdi|Switzerland|€13K|
|A. Abdu Jaber|    Eritrea| €4K|
+-------------+-----------+----+
only showing top 5 rows



In [None]:
query = """SELECT Nationality, COUNT(*)
           FROM fifa
           GROUP BY Nationality
           ORDER BY COUNT(*) DESC"""
sqlDF = spark.sql(query).show(10)

+-----------+--------+
|Nationality|count(1)|
+-----------+--------+
|    England|    1662|
|    Germany|    1198|
|      Spain|    1072|
|  Argentina|     937|
|     France|     914|
|     Brazil|     827|
|      Italy|     702|
|   Colombia|     618|
|      Japan|     478|
|Netherlands|     453|
+-----------+--------+
only showing top 10 rows



## TASK Convert Wage €1K to a new column Wage_Euro 1000
Apply a Pandas UDF to column Column Wage to numeric

In [None]:
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.types import StringType, IntegerType
# Create a vectorized user defined function (UDF)
def wage_euros_func(wage):
    
    return pd.Series([w[1:-1].replace('"','') for w in wage])

wage_euros = pandas_udf(wage_euros_func,StringType())

@pandas_udf(IntegerType())
def multiply_1000(x):
    return x*1000



In [None]:
from pyspark.sql.functions import length
dfWithSomeColumns.filter(length('Wage')<3).show()

+------------+------------+----+
|        Name| Nationality|Wage|
+------------+------------+----+
|  L. Paredes|   Argentina|  €0|
|A. Granqvist|      Sweden|  €0|
|    A. Lunev|      Russia|  €0|
|I. Smolnikov|      Russia|  €0|
|   A. Dzyuba|      Russia|  €0|
|   Luís Neto|    Portugal|  €0|
|  D. Kuzyaev|      Russia|  €0|
|      G. Sio| Ivory Coast|  €0|
|   J. Villar|    Paraguay|  €0|
|  C. Riveros|    Paraguay|  €0|
|B. Dzsudzsák|     Hungary|  €0|
|Y. Gazinskiy|      Russia|  €0|
|  V. Cáceres|    Paraguay|  €0|
|      P. Tau|South Africa|  €0|
|    C. Cueva|        Peru|  €0|
|   M. Mevlja|    Slovenia|  €0|
|   K. Traoré| Ivory Coast|  €0|
|   K. Fofana| Ivory Coast|  €0|
| V. Claesson|      Sweden|  €0|
|    B. Jokič|    Slovenia|  €0|
+------------+------------+----+
only showing top 20 rows



In [None]:
df2 = dfWithSomeColumns.filter(length('Wage')>=3)
df2.show()

+-----------------+-----------+-----+
|             Name|Nationality| Wage|
+-----------------+-----------+-----+
|         L. Messi|  Argentina|€565K|
|Cristiano Ronaldo|   Portugal|€405K|
|        Neymar Jr|     Brazil|€290K|
|           De Gea|      Spain|€260K|
|     K. De Bruyne|    Belgium|€355K|
|        E. Hazard|    Belgium|€340K|
|        L. Modrić|    Croatia|€420K|
|        L. Suárez|    Uruguay|€455K|
|     Sergio Ramos|      Spain|€380K|
|         J. Oblak|   Slovenia| €94K|
|   R. Lewandowski|     Poland|€205K|
|         T. Kroos|    Germany|€355K|
|         D. Godín|    Uruguay|€125K|
|      David Silva|      Spain|€285K|
|         N. Kanté|     France|€225K|
|        P. Dybala|  Argentina|€205K|
|          H. Kane|    England|€205K|
|     A. Griezmann|     France|€145K|
|    M. ter Stegen|    Germany|€240K|
|      T. Courtois|    Belgium|€240K|
+-----------------+-----------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import desc

df2.withColumn('Wage_num',wage_euros(col('Wage')))\
         .select('Name','Nationality',col('Wage_num').cast(IntegerType()))\
         .withColumn('Wage_euro',multiply_1000(col('Wage_num')))\
         .select('Name','Nationality','Wage_euro')\
         .sort(desc('Wage_euro')).show()

+-----------------+-----------+---------+
|             Name|Nationality|Wage_euro|
+-----------------+-----------+---------+
|         L. Messi|  Argentina|   565000|
|        L. Suárez|    Uruguay|   455000|
|        L. Modrić|    Croatia|   420000|
|Cristiano Ronaldo|   Portugal|   405000|
|     Sergio Ramos|      Spain|   380000|
|     K. De Bruyne|    Belgium|   355000|
|         T. Kroos|    Germany|   355000|
|          G. Bale|      Wales|   355000|
|        E. Hazard|    Belgium|   340000|
|         Coutinho|     Brazil|   340000|
|             Isco|      Spain|   315000|
|  Sergio Busquets|      Spain|   315000|
|     J. Rodríguez|   Colombia|   315000|
|        S. Agüero|  Argentina|   300000|
|        Neymar Jr|     Brazil|   290000|
|         Casemiro|     Brazil|   285000|
|          Marcelo|     Brazil|   285000|
|      David Silva|      Spain|   285000|
|    P. Aubameyang|      Gabon|   265000|
|           De Gea|      Spain|   260000|
+-----------------+-----------+---

In [None]:
spark.stop()