In [1]:
import numpy as np
import pandas as pd
import datetime

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

## Setup

Before doing anything, please run the following cell which will make a CSV file called "data.csv" in this directory and which creates a local spark session.

## TODO: 
1. Put some NaNs into the DF.

In [2]:
# Run me!

# Creates data
!python ./data_setup.py  

# Creates a local spark session.
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PySpark Tutorial") \
    .getOrCreate()

---

## Section 1: Getting and Querying the Data

Here we're going to make a Spark Context.  If you need to restart it, go into "Kernel > Restart Kernel."  In general, if anything bad happens spark-related, you're going to want to do that reset kernel thing.

Problems will be given by section as below.  Note that you *will have to import some new modules from pyspark; not all required imports are given above*.

---

### 1.1 Importing with inferSchema

Import the `data.csv` file.  Use `inferSchema=True` to infer the schema.  Be sure that you can print the head of the rdd as well as printing out the schema using `.printSchema()`.

---

### 1.2 Importing with Structs

Import the `data.csv` file (again).  Explicitly use StructFields to create the schema.  When would you want to explicitly define the schema types?

---

### 1.3 Using Select

Select only `datetime_col, float_col, categorical_col`. Print 25 records without Python truncating the dataset.  Hint: for the last part, you will use a parameter in the .show() method.

### 1.4 Some Example Queries of the Data

1. Select only those values whose `categorical_col` is `Low` and the `int_col` value is negative.
2. Group by categoricals, giving the sum of the `int_col` and the average of the `int_col` as new columns.
3. Group by categoricals, giving the sum of the `int_col` and the average of the `int_col` as new columns, and show only the ones having an average greater than 0.
4. Count the number of times each category comes up in the `categorical_col` column.  The result should be two columns: the category and the count.

---

## Section 2: Manipulating RDDs

This section will describe how to using maps, casting, etc., to transform your data or creat new columns.

### 2.1 Casting

1. Cast `int_col` as DoubleType and check the dtypes of the RDD. (Cast them back when you're done!)

### 2.2 Mapping

1. Create a small list of first names.  Use these to create a Spark Data Frame which has a single column.  (Hint: this is sort of a strange one with a single column, if you can't get it you can check the solutions for one way to do it.)

2. Create a small list of first names and last names.  Use these to create a Spark Data Frame which has two columns (first, last). 

3. Let's use the Spark DataFrame from the previous question (2.2.2).  Create a new column which is the concatenation of the first letters of the first and last name.  For example, "Kara", "Williams" would give the value "KW".

In [27]:
# Delete this stuff after this

file_loc = "./data.csv"
sdf = spark.read.format("csv") \
        .option("header", True) \
        .option("inferSchema", True) \
        .load(file_loc)


pyspark.sql.dataframe.DataFrame

In [39]:
# Casting

# Cast integer column as DoubleType and check the dtypes of the RDD.
# (Cast them back when you're done!)
from pyspark.sql.types import (DoubleType, StringType, IntegerType, FloatType,
                               StringType, BooleanType)
rdd = rdd.withColumn("int_col", rdd["int_col"].cast(DoubleType()))
print(pd.DataFrame(rdd.dtypes))

rdd = rdd.withColumn("int_col", rdd["int_col"].cast(IntegerType()))

                 0          1
0     datetime_col  timestamp
1        float_col     double
2          int_col     double
3  categorical_col     string
4         bool_col    boolean


In [35]:
print(pd.DataFrame(rdd.dtypes))

                 0          1
0     datetime_col  timestamp
1        float_col     double
2          int_col     double
3  categorical_col     string
4         bool_col    boolean


In [62]:
# Create list of names.
names = [("Albert", "Sampson"), ("Beth", "Schwartz"), ("Carrie", "Masters"),
         ("Doug", "Elliot"), ("Elie", "Forest")]

# Pass the list of tuples in, the second arg is for column names.
sdf_names = spark.createDataFrame(names, ["first_name", "last_name"])
sdf_names.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Albert|  Sampson|
|      Beth| Schwartz|
|    Carrie|  Masters|
|      Doug|   Elliot|
|      Elie|   Forest|
+----------+---------+



In [92]:
# We need to explicitly call "rdd" here as only RDDs have the mapping
# method, not Spark DataFrames.  Remember the new value is either in a Row or a tuple.
def make_initials(first, last):
    return (first[0] + last[0],)

# Make new column.
initials_col = F.column(sdf_names.rdd.map(lambda x: make_initials(x[0], x[1])).toDF(["initials"]))
sdf_names.withColumn("initials", initials_col)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'