# Setup

In [2]:
CSV_PATH = "/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv"

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
from functools import wraps
import time

# Introducing the DataFrames API
In Spark, a DataFrame object consists of [Row](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Row.html) objects and [Column](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.html) objects. Concretely, each row of a Spark DataFrame  is an instance of the ```pyspark.sql.Row``` while each column is an instance of the ```pyspark.sql.Column``` class. We will look at  each of these classes in detail.

## Creating DataFrames
1. From Python objects
2. External data sources
3. Other Spark objects

### Schemas
Also, when creating DataFrames, you have the option to use a schema or not. A schema in Spark defines the column names and associated data types for a DataFrame. Most often, schemas come into play when you are reading structured data from an external data source. When a schema is not used, Spark has to infer the data type which can slow your application if you have a massive  dataset. Although schemas are more of DBMS language but they offer several advantages when dealing with large datasets:
- Spark doesnt have to infer data types, so you get speed benefits.
- Without a schema, Spark creates a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming. As such, defining a schema will avoid this.
- You can detect errors early if data doesn’t match the schema.
#### Defining Schemas
- Programmatically using Spark DataTypes 
- Using Data Definition Language (DDLs)

### Spark DataFrame from Python objects

In [None]:
# Define schema using Spark DataTypes
schema = StructType([StructField("author_name", StringType(), False),
      StructField("book_title", StringType(), False),
      StructField("num_pages", IntegerType(), False)])

# Define Schema using DDL
schema = "author_name STRING, book_title STRING, num_pages INT"

In [None]:
# Define schema for our data using DDL
schema = "author_name STRING, book_title STRING, num_pages INT"

In [None]:
# A simple statistic data
# in real life, we can get alot data in the o=form of Python objects and want to create SparkDataFrames
# for instance, data being downloaded from websites
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter","LinkedIn"]],
           [2, "Brooke","Wenig", "https://tinyurl.2", "5/5/2018", 8908, ["twitter",
    "LinkedIn"]],
           [3, "Denny", "Lee", "https://tinyurl.3", "6/7/2019", 7659, ["web",
    "twitter", "FB", "LinkedIn"]],
           [4, "Tathagata", "Das", "https://tinyurl.4", "5/12/2018", 10568,
    ["twitter", "FB"]],
           [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web",
    "twitter", "FB", "LinkedIn"]],
           [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568,
    ["twitter", "LinkedIn"]]
          ]

# Create a SparkSession
# spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
spark=SparkSession.builder.appName("intro").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

# Create a DataFrame using the schema defined above
sdf = spark.createDataFrame(data, %colorsnames=[])
# Show the DataFrame; it should reflect our table above blogs_df.show()
# Print the schema used by Spark to process the DataFrame
print(sdf.printSchema())

### EXERCISE-1: READ CSV WITH SCHEMA
1. Use Spark documentation on how to read from file with a define schema. 
Note, the schema is what we arleady defined above. The data above has been saved as ```blog_simple_dataset.csv```. Read it as a Spark DataFrame with schema. Answer this question in the next cell.
2. Define schema for the ```activity_raw_data.csv``` use string for the datetime column
3. Load the dataset with and without schema using the functions defined below. Compare the loading times. Answer this question by completing the functions defined below and calling them.

In [None]:
df = pd.read_csv("/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv", nrows=1000)
df.dtypes

In [None]:
def timefn(fn):
    """
    Function for recording running time of a function
    """
    @wraps(fn)
    def measure_time(*args, **kwargs):
        t1 = time.time()
        result = fn(*args, **kwargs)
        t2 = time.time()
        print("@timefn:" + fn.__name__ + " took " + str(t2 - t1) + " seconds")
        return result
    return measure_time

In [None]:
@timefn
def load_with_schema(large_csv):
    # define the schema here using DDL
    # you can load part of the file with pandas (just a few rows)
    # to remind yourself of the data types
    schema = "`SID` INT, `ACTIVITY_ID` INT, `Last` STRING, `ACTIVITY_TIME` STRING,`STATUS` STRING"
    spark = SparkSession.builder.master("local[*]").appName("ReadWithChema").getOrCreate()
    # Now read the data 
    sdf = spark.read.schema(schema).csv(large_csv)
    
    print(sdf.show())

In [None]:
@timefn
def load_without_schema(large_csv):
    spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
    sdf = spark.read.csv(large_csv, header=True)
    print(sdf.show())

In [None]:
load_with_schema("/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv")

In [None]:
load_without_schema("/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv")

### Spark DataFrame from external data sources
The most common way (which we have already seen) is to load data from exteernal data sources and 
Spark supports numerous data stores. Spark reads data  through the ```DataFrameReaderobject```. Please look at the documeentation [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.csv.html) to see all data sources that the Spark  ```DataFrameReaderobject``` supports.

In [None]:
from IPython.display import Image
Image("SparkConnectors.png")

## Columns and Expressions in  DataFrames
In Spark DataFrames, columns behave like pandas DataFrames in several ways but they also behave different. You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions. 
- [Column](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.html) is the name of the object, which has many import methods such as describe  while [col()](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.col.html) is a standard built-in function that returns a Column.

We need to use the col() and expr() function available in pyspark,sql.functions() for many operations such as:
- Add, rename columns
- Subset data based on columns
- Access columns to compute stats on them
-  Access columns to compute operations on them such as sorting

### Add a new column using expr and col
In order to add a new column in a Spark DataFrame, we use the ```DataFrame.withColumn(new_col_name, expression_to_compute_new_col)```

In [2]:
from pyspark.sql.functions import *

In [3]:
csv_fpath = "/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv"
spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
sdf = spark.read.csv(csv_fpath, header=True)

22/01/18 11:55:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
# use expr
sdf2 = sdf.withColumn("new_col", (expr("ACTIVITY_ID > 10000")))
sdf2.show()

In [6]:
# use the col function which I prefer over the expr col("Hits")
sdf2 = sdf.withColumn("new_col", col("ACTIVITY_ID") > 10000)
sdf2.show()

+---+-----------+--------------------+------+-------+
|SID|ACTIVITY_ID|       ACTIVITY_TIME|STATUS|new_col|
+---+-----------+--------------------+------+-------+
|584|       1291|13-APR-15 10.33.4...|     S|  false|
|584|       1286|13-APR-15 10.33.4...|     S|  false|
|584|       1285|13-APR-15 10.33.4...|     S|  false|
|584|       1284|13-APR-15 10.33.4...|     S|  false|
|584|       1288|13-APR-15 10.33.4...|     S|  false|
|584|       1293|13-APR-15 10.33.4...|     S|  false|
|344|         10|13-APR-15 10.33.3...|     N|  false|
|344|         10|13-APR-15 10.33.3...|     R|  false|
|344|         10|13-APR-15 10.33.3...|     N|  false|
|584|       1269|13-APR-15 10.33.2...|     S|  false|
|584|       1268|13-APR-15 10.33.2...|     S|  false|
|584|       1267|13-APR-15 10.33.2...|     S|  false|
|584|       1266|13-APR-15 10.33.2...|     S|  false|
|584|       1265|13-APR-15 10.33.2...|     S|  false|
|584|       1264|13-APR-15 10.33.2...|     S|  false|
|584|       1263|13-APR-15 1

### Subset data  based on a few columns
In order to access a single or multiple columns, we use the ```select()``` function on the DataFrame

In [None]:
sdf3 = sdf.select('ACTIVITY_TIME', 'STATUS')
sdf3.show()

**EXERCISE-2:**

1. Check  if these statements: df.select(expr("ACTIVITY_TIME")).show(2), df.select(col("ACTIVITY_TIME")).show(2)
and df.select("ACTIVITY_TIME").show(2) will provide  the same output. Replace df with name of your Spark DataFrame.

2. Create a new DataFrame using expr to get only those rows where STATUS is "S"
Note that expr() just perfoms the operation, it doesnt filter our the rows which evaluate to false.
2. Sort DataFrame: use the col function to sort the DataFrame on "SID" column

In [None]:
# YOUR CODE

In [None]:
sdf.sort(col("SID")).show()

# SPARK DATAFRAME = ROWS + COLUMNS

In [9]:
type(sdf)

pyspark.sql.dataframe.DataFrame

In [14]:
first_row = sdf.take(1)

In [17]:
spark_row = first_row[0]
type(spark_row)

pyspark.sql.types.Row

### Rows
A row in Spark is a generic Row object, containing one or more columns. Each column may be of the same data type (e.g., integer or string), or they can have different types (integer, string, map, array, etc.). Because Row is an object in Spark and an ordered collection of fields, you can instantiate a Row the same way we instantiate any object. Consequently, you can collect Row objects in a list and create a Spark DataFrame.

In [18]:
from pyspark import Row
row = Row(name="Alice", age=11)

In [19]:
rows = [Row(name="Matei Zaharia", state="CA"), Row(name="Reynold Xin", state="CA")]
spark_df_from_rows = spark.createDataFrame(rows)
spark_df_from_rows.show()

[Stage 8:>                                                          (0 + 1) / 1]                                                                                

+-------------+-----+
|         name|state|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



**EXERCISE-3:** Creating a Spark DataFrame with Rows. Please complete the function below and call it.

In [None]:
def convert_json_to_spark_with_rows(json_file):
    # create  a list to hold all Row objects
    rows = YOUR CODE
    for idx, row in df.iterrows():
        # get lon and lat from the coord column using indexing, dict key access
        x = row['coord']['lon']       
        y = row['coord']['lat']
        # create the Row object here 
        srow = YOUR CODE
        
        # append this row object to the list
        YOUR CODE
    
    # When creating Spark DataFrame this way, its better to use schema to avoid troubles
    # create a schema for this data here, use DOUBLE as data type for lon and lat
    schema = YOUR CODE
    
    # use spark.createDataFrame() here
    # if yiu get errors, use the option verifySchema=False
    spark_df = YOUR CODE
    
    # use show() statement to show the DataFrame
    # use show() with print to ensure we see the outputs
    YOUR CODE

In [20]:
jsonfile = "../data/city.list.json"
#convert_json_to_spark_with_rows(jsonfile)

In [22]:
df = pd.read_json(jsonfile)
df.head()

Unnamed: 0,id,name,country,coord
0,707860,Hurzuf,UA,"{'lon': 34.283333, 'lat': 44.549999}"
1,519188,Novinki,RU,"{'lon': 37.666668, 'lat': 55.683334}"
2,1283378,Gorkhā,NP,"{'lon': 84.633331, 'lat': 28}"
3,1270260,State of Haryāna,IN,"{'lon': 76, 'lat': 29}"
4,708546,Holubynka,UA,"{'lon': 33.900002, 'lat': 44.599998}"


# Common DataFrames Operations

In [None]:
rows[0]

In [None]:
schema = "`id` INT, `name` STRING, `country` STRING, `lon` DOUBLE,`lat` DOUBLE"
spark_df = spark.createDataFrame(rows, schema=schema, verifySchema=False)

In [None]:
# TO BE CONTINUED

In [None]:
df.lat.unique

In [None]:
spark_df.show()

In [None]:
csv_fpath = "/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv"
spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").getOrCreate()
sdf = spark.read.csv(csv_fpath, header=True)

In [None]:
sdf2 = sdf.select('ACTIVITY_TIME', 'STATUS')
sdf2.show()

In [None]:
sdf3 = sdf.select('ACTIVITY_TIME', 'STATUS')

In [None]:
spark = SparkSession.builder.master("local[*]").appName("DataFrameFromPythonObj").config('spark.executor.memory', '1g').getOrCreate()

In [None]:
csv_fpath = "/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv"
sdf = spark.read.csv(csv_fpath, header=True)
sdf.summary()

# How Spark SQL executes computations
Lets use the expllain() function on a DataFrame to see how the Spark SQL engine plans its execution. For complicated and time consuming jobs, understanding this logical plan before running the job is important.

### CHANGING CONFIGS USING SPARK-ENV.SH AND SPARK-DEFAULTS.CONF

In [5]:
spark = SparkSession.builder.master("local[*]")\
                    .appName("DataFrameFromPythonObj")\
                    .config('spark.executor.memory', '2g')\
                    .config('spark.driver.memory', '4g')\
                    .getOrCreate()

22/01/20 09:29:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/20 09:29:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
sdf = spark.read.csv(CSV_PATH, header=True)

In [12]:
sdf_by_status = sdf.groupBy('STATUS').count()

In [13]:
sdf_by_status.explain(True)

== Parsed Logical Plan ==
'Aggregate ['STATUS], [unresolvedalias('STATUS, None), count(1) AS count#42L]
+- Relation[SID#16,ACTIVITY_ID#17,ACTIVITY_TIME#18,STATUS#19] csv

== Analyzed Logical Plan ==
STATUS: string, count: bigint
Aggregate [STATUS#19], [STATUS#19, count(1) AS count#42L]
+- Relation[SID#16,ACTIVITY_ID#17,ACTIVITY_TIME#18,STATUS#19] csv

== Optimized Logical Plan ==
Aggregate [STATUS#19], [STATUS#19, count(1) AS count#42L]
+- Project [STATUS#19]
   +- Relation[SID#16,ACTIVITY_ID#17,ACTIVITY_TIME#18,STATUS#19] csv

== Physical Plan ==
*(2) HashAggregate(keys=[STATUS#19], functions=[count(1)], output=[STATUS#19, count#42L])
+- Exchange hashpartitioning(STATUS#19, 200), true, [id=#53]
   +- *(1) HashAggregate(keys=[STATUS#19], functions=[partial_count(1)], output=[STATUS#19, count#46L])
      +- FileScan csv [STATUS#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/dmatekenya/Desktop/TMP/data/activity_log_raw.csv], PartitionFilters: []

# The SQL API: interact with a CSV file read as DataFrame with SQL commands
As mentioned, Spark allows you to read (ee.g., a CSV file) in data as DataFrame but you can interact with it using good old SQL commands. The following steps are required in order to 
1. **Create a DataFrame as required:**. In our case, we will read from external source.
2. **Create a table  view:**. Views are a special version of tables in SQL. They provide a virtual table environment for various complex operations. You can select data from multiple tables, or you can select specific data based on certain criteria in views. It does not hold the actual data; it holds only the definition of the view in the data dictionary (you will learn more about this in the Database course).

Once you have a temporary view, you can issue SQL queries using Spark SQL. These queries are no different from those you might issue against a SQL table in, say, a MySQL or PostgreSQL database.

In [None]:
# Step-1: Read in data as DataFrame
sdf = spark.read.csv(CSV_PATH, header=True)

In [16]:
# Step-2: Create a table view
sdf.createOrReplaceTempView('hello_SQL_tbl')

In [18]:
sdf.show(10)

+---+-----------+--------------------+------+
|SID|ACTIVITY_ID|       ACTIVITY_TIME|STATUS|
+---+-----------+--------------------+------+
|584|       1291|13-APR-15 10.33.4...|     S|
|584|       1286|13-APR-15 10.33.4...|     S|
|584|       1285|13-APR-15 10.33.4...|     S|
|584|       1284|13-APR-15 10.33.4...|     S|
|584|       1288|13-APR-15 10.33.4...|     S|
|584|       1293|13-APR-15 10.33.4...|     S|
|344|         10|13-APR-15 10.33.3...|     N|
|344|         10|13-APR-15 10.33.3...|     R|
|344|         10|13-APR-15 10.33.3...|     N|
|584|       1269|13-APR-15 10.33.2...|     S|
+---+-----------+--------------------+------+
only showing top 10 rows



In [22]:
# issue the SQL query
spark.sql("""SELECT STATUS, ACTIVITY_TIME FROM hello_SQL_tbl WHERE STATUS == 'S' ORDER BY ACTIVITY_TIME DESC""").show(10)



+------+--------------------+
|STATUS|       ACTIVITY_TIME|
+------+--------------------+
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
|     S|31-OCT-15 12.48.3...|
+------+--------------------+
only showing top 10 rows



                                                                                

In [23]:
df = pd.read_csv(CSV_PATH, nrows=10000)


In [27]:
df.SID.describe()

count    10000.000000
mean       390.536400
std        204.507279
min          1.000000
25%        236.000000
50%        471.000000
75%        584.000000
max        814.000000
Name: SID, dtype: float64