<a href="https://colab.research.google.com/github/lucylu9161/test/blob/main/Working_with_DF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Working with DF (DataFrame)**

**`Udemy Course: Best Hands-on Big Data Practices and Use Cases using PySpark`**

**`Author: Amin Karami (PhD, FHEA)`**

---

**DataFrame (DF)**: Schema (named columns) + declarative language. A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. It is very efficient for strucutred data.

source: https://spark.apache.org/docs/latest/sql-programming-guide.html

source: https://spark.apache.org/docs/latest/api/python/reference/

In [2]:
########## ONLY in Colab ##########
!pip3 install pyspark
########## ONLY in Colab ##########

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=93c9b332eeb4d806b7e182b03837078497f89c8beb4cad0d205acad87bd6f419
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
########## ONLY in Ubuntu Machine ##########
# Load Spark engine
!pip3 install -q findspark
import findspark
findspark.init()
########## ONLY in Ubuntu Machine ##########

In [3]:
# Linking with Spark (https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html)
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

spark

# **Part 1: Create DF and Basic Operations**

In [5]:
# Create/Load DF: (Spark automatically scans through the files and infers the schema of the dataset)
# data source: https://www.kaggle.com/thec03u5/fifa-18-demo-player-dataset

df1 = spark.read.format("csv").load("CompleteDataset.csv", inferSchema=True, header=True)

In [6]:
# Show data:
df1.show()

+---+-----------------+---+--------------------+-----------+--------------------+-------+---------+-------------------+--------------------+------+-----+-------+------------+----------+-------+-------+------------+---------+--------+-----+---------+---------+------------------+---------+-----------+----------+--------------+-----------+----------------+-------------+-------+------------+----------+-------+---------+-----------+---------+-------------+----------+--------------+------------+-------+---------------+--------+------+-------+----+----+----+----+----+------+----+----+----+----+----+----+----+----+----+----+-------------------+----+----+----+----+----+----+----+----+----+----+----+
|_c0|             Name|Age|               Photo|Nationality|                Flag|Overall|Potential|               Club|           Club Logo| Value| Wage|Special|Acceleration|Aggression|Agility|Balance|Ball control|Composure|Crossing|Curve|Dribbling|Finishing|Free kick accuracy|GK diving|GK handling|

In [7]:
# How many partitions in DF?
df1.rdd.getNumPartitions()

2

In [8]:
# Increase/Desrease the partitions in DF
df2 = df1.repartition(4)
df2.rdd.getNumPartitions()

4

In [9]:
# Show DF
df2.show()

+-----+-------------+---+--------------------+-------------+--------------------+-------+---------+--------------------+--------------------+------+-----+-------+------------+----------+-------+-------+------------+---------+--------+-----+---------+---------+------------------+---------+-----------+----------+--------------+-----------+----------------+-------------+-------+------------+----------+-------+---------+-----------+---------+-------------+----------+--------------+------------+-------+---------------+--------+------+-------+----+----+----+----+----+------+----+----+----+----+----+----+----+----+----+----+-------------------+----+----+----+----+----+----+----+----+----+----+----+
|  _c0|         Name|Age|               Photo|  Nationality|                Flag|Overall|Potential|                Club|           Club Logo| Value| Wage|Special|Acceleration|Aggression|Agility|Balance|Ball control|Composure|Crossing|Curve|Dribbling|Finishing|Free kick accuracy|GK diving|GK handlin

In [10]:
df2.columns

['_c0',
 'Name',
 'Age',
 'Photo',
 'Nationality',
 'Flag',
 'Overall',
 'Potential',
 'Club',
 'Club Logo',
 'Value',
 'Wage',
 'Special',
 'Acceleration',
 'Aggression',
 'Agility',
 'Balance',
 'Ball control',
 'Composure',
 'Crossing',
 'Curve',
 'Dribbling',
 'Finishing',
 'Free kick accuracy',
 'GK diving',
 'GK handling',
 'GK kicking',
 'GK positioning',
 'GK reflexes',
 'Heading accuracy',
 'Interceptions',
 'Jumping',
 'Long passing',
 'Long shots',
 'Marking',
 'Penalties',
 'Positioning',
 'Reactions',
 'Short passing',
 'Shot power',
 'Sliding tackle',
 'Sprint speed',
 'Stamina',
 'Standing tackle',
 'Strength',
 'Vision',
 'Volleys',
 'CAM',
 'CB',
 'CDM',
 'CF',
 'CM',
 'ID',
 'LAM',
 'LB',
 'LCB',
 'LCM',
 'LDM',
 'LF',
 'LM',
 'LS',
 'LW',
 'LWB',
 'Preferred Positions',
 'RAM',
 'RB',
 'RCB',
 'RCM',
 'RDM',
 'RF',
 'RM',
 'RS',
 'RW',
 'RWB',
 'ST']

In [11]:
# Rename Columns and Amend NULLs:
df2 = df2.withColumnRenamed("_c0", "ID") \
    .withColumnRenamed("Ball control", "Ball_Control")\
    .withColumnRenamed("Sliding tackle", "Sliding_Tackle")

df2.na.fill({"RAM": 10, "RB": 1}).show()
#df2.dropna()

+-----+-------------+---+--------------------+-------------+--------------------+-------+---------+--------------------+--------------------+------+-----+-------+------------+----------+-------+-------+------------+---------+--------+-----+---------+---------+------------------+---------+-----------+----------+--------------+-----------+----------------+-------------+-------+------------+----------+-------+---------+-----------+---------+-------------+----------+--------------+------------+-------+---------------+--------+------+-------+----+----+----+----+----+------+----+----+----+----+----+----+----+----+----+----+-------------------+----+----+----+----+----+----+----+----+----+----+----+
|   ID|         Name|Age|               Photo|  Nationality|                Flag|Overall|Potential|                Club|           Club Logo| Value| Wage|Special|Acceleration|Aggression|Agility|Balance|Ball_Control|Composure|Crossing|Curve|Dribbling|Finishing|Free kick accuracy|GK diving|GK handlin

In [None]:
# Transformation (SELECT):
df2.select("Name","Overall").distinct().show()

In [None]:
# Transformation (FILTER):
df2.filter(df2["Overall"] > 70).show()

In [None]:
# Transformation (FILTER):
df2.select("Overall", "Name", "Age").where(df2["Overall"]>70).show()

In [None]:
# Transformation (FILTER):
df2.where(df2["Overall"]>70).groupBy("Age").count().sort("Age").show()

In [None]:
# Visualize the results:
df2_result = df2.where(df2["Overall"]>70).groupBy("Age").count().sort("Age")

pandas_df = df2_result.toPandas()
pandas_df.plot(x = "Age", y = "count", kind = "bar")


In [None]:
pandas_df.sort_values(by="count", ascending=False).plot(x = "Age", y = "count", kind = "bar")

# **Part 2: Advanced DF Operations: Spark SQL and UDF**

In [None]:
# Spark SQL (Register the DF using a local temporary view):

df2.createOrReplaceTempView("df_football")

In [None]:
# SQL Query:

sql_query = """ SELECT Age, count(*) as Count
                FROM df_football
                WHERE Overall > 70
                GROUP BY Age
                ORDER BY Age """

result = spark.sql(sql_query)
result.show()

In [None]:
# UDF (User Defined Functions):
def uppercase_converter(record):
  if len(record) > 10:
    return record.upper()
  else:
    return record.lower()

# register the DF
df2.createOrReplaceTempView("UDF_football")

# register the function
spark.udf.register("UPPER", uppercase_converter)

# use the UDF in SQL
sql_query = "SELECT Age, UPPER(Name) as Name, UPPER(Club) as Club FROM UDF_football"

result = spark.sql(sql_query)
result.show()
