## DataFrames

A DataFrame is a collection of data organized in rows and named columns. DataFrames are built on top of RDD. The Spark DataFrame is easy to understand and more optimized for complicated operations than RDD.

In [1]:
from pyspark import SparkContext

# create a spark context
sc = SparkContext("local", "DataFrames")

In [2]:
from pyspark.sql import SparkSession

# create a spark session
spark_session = SparkSession.builder.getOrCreate()

print(spark_session)

<pyspark.sql.session.SparkSession object at 0x000001D0E7F14B88>


### How can I read a DataFrame?

It is possible to read files using ```.read ``` that returns a **DataFrameReader**

In [3]:
df = spark_session.read.csv("sample_data/301-people-info.csv", header=True)

# register in the catalog
df.createOrReplaceTempView("person_info")

# list tables in catalog
spark_session.catalog.listTables()

[Table(name='person_info', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

There are more ways to read files

    - df1 = spark.read.json("301-people-info.json")
    - df3 = spark.read.load("301-people-info.parquet")
    - df4 = spark.read.text("301-people-info.txt")

### Creating a DataFrame from a RDD

```rdd.toDF(*cols)``` - returns a DataFrame named by *cols

In [4]:
rdd = sc.parallelize((("James", "Sales", 3000),
  ("Michael", "Sales", 4600),
  ("Robert", "Sales", 4100),
  ("Maria", "Finance", 3000),
))
rdd.collect()

[('James', 'Sales', 3000),
 ('Michael', 'Sales', 4600),
 ('Robert', 'Sales', 4100),
 ('Maria', 'Finance', 3000)]

In [5]:
df_from_rdd = rdd.toDF(["name", "office", "salary"])
df_from_rdd.show()

+-------+-------+------+
|   name| office|salary|
+-------+-------+------+
|  James|  Sales|  3000|
|Michael|  Sales|  4600|
| Robert|  Sales|  4100|
|  Maria|Finance|  3000|
+-------+-------+------+



### Content analysis

```df.dtypes``` return a list of data type from each column

In [6]:
df.dtypes

[('name', 'string'),
 ('surname', 'string'),
 ('street', 'string'),
 ('city', 'string'),
 ('state', 'string'),
 ('postalcode', 'string'),
 ('salary', 'string')]

```df.show(n)``` return the first n rows in a tabular mode

In [7]:
df.show(5)

+-----------+--------+--------------------+---------+-----+----------+------+
|       name| surname|              street|     city|state|postalcode|salary|
+-----------+--------+--------------------+---------+-----+----------+------+
|       John|     Doe|   120 jefferson st.|Riverside|   NJ|     08075| 10000|
|       Jack|McGinnis|        220 hobo Av.|    Phila|   PA|     09119| 18900|
|John Da Man|  Repici|   120 Jefferson St.|Riverside|   NJ|     08075| 32100|
|    Stephen|   Tyler|7452 Terrace At t...| SomeTown|   SD|     91234| 32908|
|       null|Blankman|                null| SomeTown|   SD|     00298| 44221|
+-----------+--------+--------------------+---------+-----+----------+------+
only showing top 5 rows



```df.head(n)``` Returns a Row list from the first n rows.

In [8]:
df.head(2)

[Row(name='John', surname='Doe', street='120 jefferson st.', city='Riverside', state='NJ', postalcode='08075', salary='10000'),
 Row(name='Jack', surname='McGinnis', street='220 hobo Av.', city='Phila', state='PA', postalcode='09119', salary='18900')]

```df.first()``` return the first row as **Row**

In [9]:
df.first()

Row(name='John', surname='Doe', street='120 jefferson st.', city='Riverside', state='NJ', postalcode='08075', salary='10000')

```df.take(n)``` Returns a Row list from the first n rows.

In [10]:
df.take(3)

[Row(name='John', surname='Doe', street='120 jefferson st.', city='Riverside', state='NJ', postalcode='08075', salary='10000'),
 Row(name='Jack', surname='McGinnis', street='220 hobo Av.', city='Phila', state='PA', postalcode='09119', salary='18900'),
 Row(name='John Da Man', surname='Repici', street='120 Jefferson St.', city='Riverside', state='NJ', postalcode='08075', salary='32100')]

```df.schema``` return the DataFrame's schema

In [11]:
df.schema

StructType(List(StructField(name,StringType,true),StructField(surname,StringType,true),StructField(street,StringType,true),StructField(city,StringType,true),StructField(state,StringType,true),StructField(postalcode,StringType,true),StructField(salary,StringType,true)))

```df.describe(*cols)``` calculate statistics of numeric and strings columns

In [12]:
df.describe('name', 'salary').show()

+-------+-------+------------------+
|summary|   name|            salary|
+-------+-------+------------------+
|  count|      6|                 7|
|   mean|   null|24418.428571428572|
| stddev|   null| 12265.50534163653|
|    min|   Jack|             10000|
|    max|Stephen|             44221|
+-------+-------+------------------+



```df.columns``` show the list column's name

In [13]:
df.columns

['name', 'surname', 'street', 'city', 'state', 'postalcode', 'salary']

```df.count()``` count recorded rows

In [14]:
df.count()

7

```df.printSchema()``` print the schema in a tree format

In [15]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postalcode: string (nullable = true)
 |-- salary: string (nullable = true)



### Removing duplicated values

```df.distinct()``` return a new DataFrame comparing all columns

In [16]:
df_temp = df.distinct()
df_temp.show()

+------------------+--------+--------------------+-----------+-----+----------+------+
|              name| surname|              street|       city|state|postalcode|salary|
+------------------+--------+--------------------+-----------+-----+----------+------+
|           Stephen|   Tyler|7452 Terrace At t...|   SomeTown|   SD|     91234| 32908|
|       John Da Man|  Repici|   120 Jefferson St.|  Riverside|   NJ|     08075| 32100|
|              null|Blankman|                null|   SomeTown|   SD|     00298| 44221|
|Joan the bone Anne|     Jet|  9th at Terrace plc|Desert City|   CO|     00123| 13900|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
|              John|     Doe|   120 jefferson st.|  Riverside|   NJ|     08075| 10000|
+------------------+--------+--------------------+-----------+-----+----------+------+



```df.dropDuplicates(subset)``` return a new DataFrame comparing a subset of columns

In [17]:
df_temp = df.dropDuplicates(subset=['state'])
df_temp.show()

+------------------+--------+--------------------+-----------+-----+----------+------+
|              name| surname|              street|       city|state|postalcode|salary|
+------------------+--------+--------------------+-----------+-----+----------+------+
|              John|     Doe|   120 jefferson st.|  Riverside|   NJ|     08075| 10000|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
|           Stephen|   Tyler|7452 Terrace At t...|   SomeTown|   SD|     91234| 32908|
|Joan the bone Anne|     Jet|  9th at Terrace plc|Desert City|   CO|     00123| 13900|
+------------------+--------+--------------------+-----------+-----+----------+------+



### Null values

```.na.fill()``` - return a new DataFrame with value in a list of columns.

- Alias:
    - ```DataFrame.fillna()```

In [18]:
df.na.fill("unnamed", ["name", "surname", "street", "city"]).show() 

+------------------+--------+--------------------+-----------+-----+----------+------+
|              name| surname|              street|       city|state|postalcode|salary|
+------------------+--------+--------------------+-----------+-----+----------+------+
|              John|     Doe|   120 jefferson st.|  Riverside|   NJ|     08075| 10000|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
|       John Da Man|  Repici|   120 Jefferson St.|  Riverside|   NJ|     08075| 32100|
|           Stephen|   Tyler|7452 Terrace At t...|   SomeTown|   SD|     91234| 32908|
|           unnamed|Blankman|             unnamed|   SomeTown|   SD|     00298| 44221|
|Joan the bone Anne|     Jet|  9th at Terrace plc|Desert City|   CO|     00123| 13900|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
+------------------+--------+--------------------+-----------+-----+----------+------+



In [19]:
df.fillna("unnamed", ["name", "surname", "street", "city"]).show() 

+------------------+--------+--------------------+-----------+-----+----------+------+
|              name| surname|              street|       city|state|postalcode|salary|
+------------------+--------+--------------------+-----------+-----+----------+------+
|              John|     Doe|   120 jefferson st.|  Riverside|   NJ|     08075| 10000|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
|       John Da Man|  Repici|   120 Jefferson St.|  Riverside|   NJ|     08075| 32100|
|           Stephen|   Tyler|7452 Terrace At t...|   SomeTown|   SD|     91234| 32908|
|           unnamed|Blankman|             unnamed|   SomeTown|   SD|     00298| 44221|
|Joan the bone Anne|     Jet|  9th at Terrace plc|Desert City|   CO|     00123| 13900|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
+------------------+--------+--------------------+-----------+-----+----------+------+



```.na.drop()``` - returns a new DataFrame dropping rows with null values. 

- Alias:
    - ```DataFrame.dropna()```

In [20]:
df.na.drop().show()

+------------------+--------+--------------------+-----------+-----+----------+------+
|              name| surname|              street|       city|state|postalcode|salary|
+------------------+--------+--------------------+-----------+-----+----------+------+
|              John|     Doe|   120 jefferson st.|  Riverside|   NJ|     08075| 10000|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
|       John Da Man|  Repici|   120 Jefferson St.|  Riverside|   NJ|     08075| 32100|
|           Stephen|   Tyler|7452 Terrace At t...|   SomeTown|   SD|     91234| 32908|
|Joan the bone Anne|     Jet|  9th at Terrace plc|Desert City|   CO|     00123| 13900|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
+------------------+--------+--------------------+-----------+-----+----------+------+



```.na.replace(to_replace, value, subset)``` - return a new DataFrame replacing a value with another value in a specified subset

- Alias:
    -  ```DataFrame.replace()```


In [21]:
df.na.replace('08075', '00895', subset=['zipcode']).show()

+------------------+--------+--------------------+-----------+-----+----------+------+
|              name| surname|              street|       city|state|postalcode|salary|
+------------------+--------+--------------------+-----------+-----+----------+------+
|              John|     Doe|   120 jefferson st.|  Riverside|   NJ|     08075| 10000|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
|       John Da Man|  Repici|   120 Jefferson St.|  Riverside|   NJ|     08075| 32100|
|           Stephen|   Tyler|7452 Terrace At t...|   SomeTown|   SD|     91234| 32908|
|              null|Blankman|                null|   SomeTown|   SD|     00298| 44221|
|Joan the bone Anne|     Jet|  9th at Terrace plc|Desert City|   CO|     00123| 13900|
|              Jack|McGinnis|        220 hobo Av.|      Phila|   PA|     09119| 18900|
+------------------+--------+--------------------+-----------+-----+----------+------+



## Increasing and decreasing the number of partitions

Now, I will create a temporary dataframe only to show ```.repartition``` and ```.coalesce```.

  - ```.repartition(n)``` - A new DataFrame with n partitions. This function uses a shuffle to redistribute data.
  - ```.coalesce(n)``` - A new reduced DataFrame with n partitions. this function uses a shuffle to reduce number of partitions data performing better than ```.repartition()```.

In [22]:
df_temp = df.fillna("null")
print("Comparing address: ")
print("    df_temp: " , hex(id(df_temp)))
print("    df: " , hex(id(df)))

Comparing address: 
    df_temp:  0x1d0eab3a388
    df:  0x1d0e9632c88


In [23]:
print("Before repartition:", df_temp.rdd.getNumPartitions())
df_temp = df_temp.repartition(4)
print("After repartition:", df_temp.rdd.getNumPartitions())

Before repartition: 1
After repartition: 4


In [24]:
print("Before coalesce:", df_temp.rdd.getNumPartitions())
df_temp = df_temp.coalesce(1)
print("After coalesce:", df_temp.rdd.getNumPartitions())

Before coalesce: 4
After coalesce: 1
