# **Introduction to DataFrames**


## Objectives

A DataFrame is two-dimensional. Columns can be of different data types. DataFrames accept many data inputs including series and other DataFrames. You can pass indexes (row labels) and columns (column labels). Indexes can be numbers, dates, or strings/tuples.

After going through this notebook, you will know how to:

*   Load a data file into a DataFrame
*   View the data schema of a DataFrame
*   Perform basic data manipulation
*   Aggregate data in a DataFrame

## Setup

Using Pandas to load a CSV file from disc to a pandas dataframe in memory. PySpark is the Spark API for Python. Use PySpark to initialize the spark context.

In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pandas

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=811a7a23d871e2be9550dd2ba8023a02e546c8120f324c270b94555c2f9faaa7
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import findspark
findspark.init()

In [None]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Exercise 1 -  Spark session

Creating and initializing the Spark session needed to load the dataframes and operate on it

#### Task 1: Creating the spark session and context

In [None]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#### Task 2: Initialize Spark session

To work with dataframes we just need to verify that the spark session instance has been created.

In [None]:
spark

## Exercise 2 - Load the data and Spark dataframe

Read the CSV file into a Pandas DataFrame and then read it into a Spark DataFrame.

To create a Spark DataFrame we load an external DataFrame, called mtcars. This DataFrame includes 32 observations on 11 variables:

| colIndex | colName | units/description                        |
| :------: | :------ | :--------------------------------------- |
|   [, 1]  | mpg     | Miles per gallon                         |
|   [, 2]  | cyl     | Number of cylinders                      |
|   [, 3]  | disp    | Displacement (cu.in.)                    |
|   [, 4]  | hp      | Gross horsepower                         |
|   [, 5]  | drat    | Rear axle ratio                          |
|   [, 6]  | wt      | Weight (lb/1000)                         |
|   [, 7]  | qsec    | 1/4 mile time                            |
|   [, 8]  | vs      | V/S                                      |
|   [, 9]  | am      | Transmission (0 = automatic, 1 = manual) |
|   [,10]  | gear    | Number of forward gears                  |
|   [,11]  | carb    | Number of carburetors                    |


#### Task 1: Loading data into a Pandas DataFrame

In [None]:
# Read the file using `read_csv` function into pandas dataframe
mtcars = pd.read_csv('mtcars.csv')

In [None]:
# Preview a few records
mtcars.head()

Unnamed: 0.1,Unnamed: 0,mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb
0,Mazda RX4,21.0,6,160.0,110,3.9,2.62,16.46,0,1,4,4
1,Mazda RX4 Wag,21.0,6,160.0,110,3.9,2.875,17.02,0,1,4,4
2,Datsun 710,22.8,4,108.0,93,3.85,2.32,18.61,1,1,4,1
3,Hornet 4 Drive,21.4,6,258.0,110,3.08,3.215,19.44,1,0,3,1
4,Hornet Sportabout,18.7,8,360.0,175,3.15,3.44,17.02,0,0,3,2


#### Task 2: Loading data into a Spark DataFrame


In [None]:
# We use the `createDataFrame` function to load the data into a spark dataframe
sdf = spark.createDataFrame(mtcars) 

In [None]:
# Let us look at the schema of the loaded spark dataframe (viewing dataset parameters)
sdf.printSchema()

root
 |-- Unnamed: 0: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: long (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: long (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: long (nullable = true)
 |-- am: long (nullable = true)
 |-- gear: long (nullable = true)
 |-- carb: long (nullable = true)



## Exercise 3: Basic data analysis and manipulation

Previewing the data and then applying some filtering and columwise operations.


#### Task 1: Displays the content of the DataFrame

We use the `show()` method for this. Here we preview the first 5 records. Compare it to a similar `head()` function in Pandas.

In [None]:
sdf.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|       Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



We use the `select()` function to select a particular column of data. Here we show the `mpg` column.


In [None]:
sdf.select('mpg').show(5)

+----+
| mpg|
+----+
|21.0|
|21.0|
|22.8|
|21.4|
|18.7|
+----+
only showing top 5 rows



#### Task 2: Filtering and Columnar operations

Filtering and Column operations are important to select relevant data and apply useful transformations.

We first filter to only retain rows with mpg < 18. We use the `filter()` function for this.

In [None]:
sdf.filter(sdf['mpg'] < 18).show(5)

+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Unnamed: 0| mpg|cyl| disp| hp|drat|  wt| qsec| vs| am|gear|carb|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
| Duster 360|14.3|  8|360.0|245|3.21|3.57|15.84|  0|  0|   3|   4|
|  Merc 280C|17.8|  6|167.6|123|3.92|3.44| 18.9|  1|  0|   4|   4|
| Merc 450SE|16.4|  8|275.8|180|3.07|4.07| 17.4|  0|  0|   3|   3|
| Merc 450SL|17.3|  8|275.8|180|3.07|3.73| 17.6|  0|  0|   3|   3|
|Merc 450SLC|15.2|  8|275.8|180|3.07|3.78| 18.0|  0|  0|   3|   3|
+-----------+----+---+-----+---+----+----+-----+---+---+----+----+
only showing top 5 rows



Operating on Columns

Spark also provides a number of functions that can be directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions to convert the weight values from `lb` to `metric ton`.
We create a new column called `wtTon` that has the weight from the `wt` column converted to metric tons.


In [None]:
sdf.withColumn('wtTon', sdf['wt'] * 0.45).show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|       Unnamed: 0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|  wtTon|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|  1.179|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|1.29375|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|  1.044|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|1.44675|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|  1.548|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+-------+
only showing top 5 rows



## Exercise 4: Grouping and Aggregation

Spark DataFrames support a number of commonly used functions to aggregate data after grouping. In this example we compute the average weight of cars by their cylinders as shown below.


In [None]:
sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)

+---+-----------------+
|cyl|          avg(wt)|
+---+-----------------+
|  6|3.117142857142857|
|  8|3.999214285714286|
|  4|2.285727272727273|
+---+-----------------+



We can also sort the output from the aggregation to get the most common cars.

In [None]:
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)


+---+---------+
|cyl|count(wt)|
+---+---------+
|  8|       14|
|  4|       11|
|  6|        7|
+---+---------+

