## Creating Spark DataFrames

PySpark SQL DataFrame is a distributed collection of data organized into named columns. Under the hood, DataFrames are built on top of RDDs

### `rdd.toDF()`
The `toDF()` method is used to convert an RDD to DataFrame. The method is available on RDD of Row objects.

In [None]:
# Create an RDD from a list
hrly_views_rdd  = spark.sparkContext.parallelize([
    ["Betty_White" , 288886],
    ["Main_Page", 139564],
    ["New_Year's_Day", 7892],
    ["ABBA", 8154]
])

# Convert RDD to DataFrame
hrly_views_df = hrly_views_rdd\
    .toDF(["article_title", "view_count"])

### `DataFrame.show()`

The `show()` method is used to display the content of the DataFrame. By default, it shows the first 20 rows.

In [None]:
hrly_views_df.show(4, truncate=False)

```text
+--------------+-----------+
| article_title| view_count|
+--------------+-----------+
|   Betty_White|     288886|
|     Main_Page|     139564|
|New_Year's_Day|       7892|
|          ABBA|       8154|
+--------------+-----------+
```

### `DataFrame.rdd`

The `rdd` attribute is used to convert a DataFrame to RDD.

In [None]:
# Access DataFrame's underlying RDD
hrly_views_df_rdd = hrly_views_df.rdd

# Check object type
print(type(hrly_views_df_rdd)) 
# <class 'pyspark.rdd.RDD'>

## Spark DataFrames from Exernal Data Sources

In [None]:
print(type(spark.read)) 
# <class 'pyspark.sql.readwriter.DataFrameReader'>

# Read CSV to DataFrame
hrly_views_df = spark.read\
.option('header', True) \
.option('delimiter', ' ') \
.option('inferSchema', True)\ 
.csv('views_2022_01_01_000000.csv')

## Inspecting and Cleaning Data with PySpark

### `DataFrame.printSchema()`

The `printSchema()` method is used to print the schema of the DataFrame.

In [None]:
# Display DataFrame schema
hrly_views_df.printSchema()

```text
root
|-- language_code: string (nullable = true)
|-- article_title: string (nullable = true)
|-- hourly_count: integer (nullable = true)
|-- monthly_count: integer (nullable = true)
```

### `DataFrame.describe()`

We can use the `describe()` method to get the summary statistics of the DataFrame.

In [None]:
hrly_views_df_desc = hrly_views_df.describe()
hrly_views_df_desc.show(truncate=False)

```text
+-------+-------------+-------------+------------+-------------+
|summary|language_code|article_title|hourly_count|monthly_count|
+-------+-------------+-------------+------------+-------------+
|  count|      4654091|      4654091|     4654091|      4654091|
|   mean|         null|         null|     4.52417|          0.0|
| stddev|         null|         null|   182.92502|          0.0|
|    min|           aa|            -|           1|            0|
|    max|       zu.m.d|            -|      288886|            0|
+-------+-------------+-------------+------------+-------------+
```

### `Dataframe.drop()`

In [None]:
# Drop `monthly_count` and display new DataFrame
hrly_views_df = hrly_views_df.drop('monthly_count')
hrly_views_df.show(5) 

```text
+-------------+---------------------------+------------+
|language_code|article_title              |hourly_count|
+-------------+---------------------------+------------+
|en           |Cividade_de_Terroso        |           2|
|en           |Peel_Session_(Autechre_EP) |           2|
|en           |Young_Street_Bridge        |           1|
|en           |Troy,_Alabama              |           1|
|en           |Charlotte_Johnson_Wahl     |          10|
+-------------+---------------------------+------------+
```

### `DataFrame.withColumnRenamed()`

In [None]:
hrly_views_df = hrly_views_df\
.withColumnRenamed('article_title', 'page_title')
# Display DataFrame schema
hrly_views_df.printSchema()

```text
root
|-- language_code: string (nullable = true)
|-- page_title: string (nullable = true)
|-- hourly_count: integer (nullable = true)
```

## Querying PySpark DataFrames

### `DataFrame.filter()`

It is used to filter the rows of the DataFrame based on the given condition.

In [None]:
hrly_views_df\
    .filter(hrly_views_df.language_code == "kw.m")\
    .show(truncate=False)

```text
+-------------+-----------------------+------------+-------------------+
|language_code|article_title          |hourly_count|monthly_count|
+-------------+-----------------------+------------+-------------------+
|kw.m         |Bresel_Diabarth_Spayn  |1           |0                  |
|kw.m         |Can_an_Pescador_Kernûak|1           |0                  |
|kw.m         |Ferdinand_Magellan     |1           |0                  |
|kw.m         |Justė_Arlauskaitė      |16          |0                  |
|kw.m         |Lithouani              |2           |0                  |
|kw.m         |Nolwenn_Leroy          |1           |0                  |
|kw.m         |Ohio                   |1           |0                  |
|kw.m         |Taywan                 |1           |0                  |
+-------------+-----------------------+------------+-------------------+
```

### `DataFrame.select()`

It is used to select the columns of the DataFrame. Analogous to the SQL `SELECT` statement.

### `DataFrame.orderBy()`

It is used to sort the DataFrame based on the given column. Analogous to the SQL `ORDER BY` statement.

In [None]:
hrly_views_df\
    .filter(hrly_views_df.language_code == "kw.m")\
    .select(['language_code', 'article_title', 'hourly_count'])\
    .orderBy('hourly_count', ascending=False)\    
    .show(5, truncate=False)

```text
+-------------+-----------------------+------------+-------------------+
|language_code|article_title          |hourly_count|total_monthly_count|
+-------------+-----------------------+------------+-------------------+
|kw.m         |Justė_Arlauskaitė      |16          |0                  |
|kw.m         |Lithouani              |2           |0                  |
|kw.m         |Bresel_Diabarth_Spayn  |1           |0                  |
|kw.m         |Can_an_Pescador_Kernûak|1           |0                  |
|kw.m         |Nolwenn_Leroy          |1           |0                  |
+-------------+-----------------------+------------+-------------------+
```

### `DataFrame.groupBy()`

It is used to group the DataFrame based on the given column. Analogous to the SQL `GROUP BY` statement.

Select the sum of `hourly_count` by `language_code`.

In [None]:
hrly_views_df\
    .select(['language_code', 'hourly_count'])\
    .groupBy('language_code')\
    .sum() \
    .orderBy('sum(hourly_count)', ascending=False)\
    .show(5, truncate=False)

```text
+-------------+-----------------+
|language_code|sum(hourly_count)|
+-------------+-----------------+
|en.m         |8095763          |
|en           |2693185          |
|de.m         |1313505          |
|es.m         |963835           |
|ru.m         |927583           |
+-------------+-----------------+
```

## Querying PySpark with SQL

### `DataFrame.createOrReplaceTempView()`

It is used to create a temporary view of the DataFrame. The temporary view can be queried using SQL.

In [None]:
hrly_views_df.createOrReplaceTempView('hourly_counts')

### `SparkSession.sql()`

The `sql()` method is used to execute SQL queries on the DataFrame.

In [None]:
query = """SELECT * FROM hourly_counts WHERE language_code = 'kw.m'"""
spark.sql(query).show(truncate=False)

```text
+-------------+-----------------------+------------+-------------+
|language_code|article_title          |hourly_count|monthly_count|
+-------------+-----------------------+------------+-------------+
|kw.m         |Bresel_Diabarth_Spayn  |           1|            0|
|kw.m         |Can_an_Pescador_Kernûak|           1|            0|
|kw.m         |Ferdinand_Magellan     |           1|            0|
|kw.m         |Justė_Arlauskaitė      |          16|            0|
|kw.m         |Lithouani              |           2|            0|
|kw.m         |Nolwenn_Leroy          |           1|            0|
|kw.m         |Ohio                   |           1|            0|
|kw.m         |Taywan                 |           1|            0|
+-------------+-----------------------+------------+-------------+
```

In [None]:
query = """SELECT language_code, article_title, hourly_count
    FROM hourly_counts
    WHERE language_code = 'kw.m'
    ORDER BY hourly_count DESC"""

spark.sql(query).show(truncate=False)

```text
+-------------+-----------------------+------------+-------------------+
|language_code|article_title          |hourly_count|total_monthly_count|
+-------------+-----------------------+------------+-------------------+
|kw.m         |Justė_Arlauskaitė      |          16|                  0|
|kw.m         |Lithouani              |           2|                  0|
|kw.m         |Bresel_Diabarth_Spayn  |           1|                  0|
|kw.m         |Can_an_Pescador_Kernûak|           1|                  0|
|kw.m         |Nolwenn_Leroy          |           1|                  0|
+-------------+-----------------------+------------+-------------------+
```

In [None]:
query = """SELECT language_code, SUM(hourly_count) as sum_hourly_count
    FROM hourly_counts
    GROUP BY language_code
    ORDER BY sum_hourly_count DESC"""

spark.sql(query).show(5, truncate=False)

```text
+-------------+-----------------+
|language_code|sum(hourly_count)|
+-------------+-----------------+
|en.m         |8095763          |
|en           |2693185          |
|de.m         |1313505          |
|es.m         |963835           |
|ru.m         |927583           |
+-------------+-----------------+
```

## Saving PySpark DataFrames

### `SparkSession.write()`

The `write()` method is used to save the DataFrame to a file. The `select()` method is used to select the columns to save. The `mode` parameter is used to specify the behavior when the file already exists. 

In [None]:
hrly_views_df\
    .select(['language_code', 'article_title', 'hourly_count'])\
    .write.csv('cleaned/csv/views_2022_01_01_000000/', mode="overwrite")

Use `SparkSession.read()` to confirm that it looks the same as the DataFrame

In [None]:
# Read DataFrame back from disk
hrly_views_df_restored = spark.read\
    .csv('cleaned/csv/views_2022_01_01_000000/')
hrly_views_df_restored.printSchema()

```text
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
```

CSV do not retain information about column headers or datatypes. The file format called `Parquet` offers efficient storage and encoding of data.

In [None]:
# Write DataFrame to Parquet
hrly_views_slim_df
    .write.parquet('cleaned/parquet/views_2022_01_01_000000/', mode="overwrite")

# Read Parquet as DataFrame
hrly_views_df_restored = spark.read\
    .parquet('cleaned/parquet/views_2022_01_01_000000/')

# Check DataFrame's schema
hrly_views_df_restored.printSchema()

```text
root
|-- language_code: string (nullable = true)
|-- article_title: string (nullable = true)
|-- hourly_count: integer (nullable = true)
```