## Instalando a biblioteca PYSpark

In [1]:
!pip install pyspark



In [2]:
!pip install findspark



In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [4]:
df = spark.sql('''select 'Sucesso total, estamos online!' as hello''')
df.show()

+--------------------+
|               hello|
+--------------------+
|Sucesso total, es...|
+--------------------+



In [5]:
# Import spark libraries
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F # for more sql functions
from functools import reduce

## Data Manipulation using Spark

In [6]:
path = r'D:\Documentos\FIAP - Pos Tech Data Analytics\Aulas\Fase 3\Dados\banklist.csv'
df = spark.read.csv(path, sep= ',', inferSchema=True, header=True)

print('df.count: ', df.count())
print('df.col ct: ', len(df.columns))
print('df.columns: ', df.columns)

df.count:  561
df.col ct:  6
df.columns:  ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


## Using SQL in PySpark

In [7]:
df.createOrReplaceTempView('banklist')

df_check = spark.sql('''select `Bank Name`, `City`, `Closing Date` FROM banklist''')
df_check.show(4, truncate=False)

+--------------------------------+-------------+------------+
|Bank Name                       |City         |Closing Date|
+--------------------------------+-------------+------------+
|The First State Bank            |Barboursville|3-Apr-20    |
|Ericson State Bank              |Ericson      |14-Feb-20   |
|City National Bank of New Jersey|Newark       |1-Nov-19    |
|Resolute Bank                   |Maumee       |25-Oct-19   |
+--------------------------------+-------------+------------+
only showing top 4 rows



## DataFrame Basic Operations

In [8]:
df.describe().show()

+-------+--------------------+-------+----+-----------------+---------------------+------------+
|summary|           Bank Name|   City|  ST|             CERT|Acquiring Institution|Closing Date|
+-------+--------------------+-------+----+-----------------+---------------------+------------+
|  count|                 561|    561| 561|              561|                  561|         561|
|   mean|                NULL|   NULL|NULL|31685.68449197861|                 NULL|        NULL|
| stddev|                NULL|   NULL|NULL|16446.65659309965|                 NULL|        NULL|
|    min|1st American Stat...|Acworth|  AL|               91|      1st United Bank|    1-Aug-08|
|    max|               ebank|Wyoming|  WY|            58701|  Your Community Bank|    9-Sep-11|
+-------+--------------------+-------+----+-----------------+---------------------+------------+



In [9]:
df.describe('City', 'ST').show()

+-------+-------+----+
|summary|   City|  ST|
+-------+-------+----+
|  count|    561| 561|
|   mean|   NULL|NULL|
| stddev|   NULL|NULL|
|    min|Acworth|  AL|
|    max|Wyoming|  WY|
+-------+-------+----+



## Count, Columns and Schema

In [10]:
print('df.count: ', df.count())
print('df columns: ', len(df.columns))
print('df.columns: ', df.columns)
print('df dtypes: ', df.dtypes)
print('df schema: ', df.schema)

df.count:  561
df columns:  6
df.columns:  ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']
df dtypes:  [('Bank Name', 'string'), ('City', 'string'), ('ST', 'string'), ('CERT', 'int'), ('Acquiring Institution', 'string'), ('Closing Date', 'string')]
df schema:  StructType([StructField('Bank Name', StringType(), True), StructField('City', StringType(), True), StructField('ST', StringType(), True), StructField('CERT', IntegerType(), True), StructField('Acquiring Institution', StringType(), True), StructField('Closing Date', StringType(), True)])


In [11]:
df.printSchema()

root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: integer (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)



## Remove Duplicates

In [12]:
df = df.dropDuplicates()
print('df.count: ', df.count())
print('df.columns: ', df.columns)

df.count:  561
df.columns:  ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date']


## Select Specific Columns

In [13]:
df2 = df.select(*['Bank Name', 'City'])
df2.show(2)

+--------------------+--------+
|           Bank Name|    City|
+--------------------+--------+
| First Bank of Idaho| Ketchum|
|Amcore Bank, Nati...|Rockford|
+--------------------+--------+
only showing top 2 rows



## Select Multiple Columns

In [14]:
col_l = list(set(df.columns) - {'CERT', 'ST'})
df2 = df.select(*col_l)
df2.show(5)

+------------+--------------------+---------------------+--------+
|Closing Date|           Bank Name|Acquiring Institution|    City|
+------------+--------------------+---------------------+--------+
|   24-Apr-09| First Bank of Idaho|      U.S. Bank, N.A.| Ketchum|
|   23-Apr-10|Amcore Bank, Nati...|          Harris N.A.|Rockford|
|   11-Sep-09|        Venture Bank| First-Citizens Ba...|   Lacey|
|   31-Jul-09|First State Bank ...|         Herring Bank|   Altus|
|   11-Dec-09|Valley Capital Ba...| Enterprise Bank &...|    Mesa|
+------------+--------------------+---------------------+--------+
only showing top 5 rows



## Rename Columns

In [15]:
df2 = df \
    .withColumnRenamed('Bank Name', 'bank_name') \
    .withColumnRenamed('City', 'city') \
    .withColumnRenamed('Acquiring Institution', 'acq_institution') \
    .withColumnRenamed('Closing Date', 'closing_date') \
    .withColumnRenamed('ST', 'state') \
    .withColumnRenamed('CERT', 'cert')

df2.show()

+--------------------+----------------+-----+-----+--------------------+------------+
|           bank_name|            city|state| cert|     acq_institution|closing_date|
+--------------------+----------------+-----+-----+--------------------+------------+
| First Bank of Idaho|         Ketchum|   ID|34396|     U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|        Rockford|   IL| 3735|         Harris N.A.|   23-Apr-10|
|        Venture Bank|           Lacey|   WA|22868|First-Citizens Ba...|   11-Sep-09|
|First State Bank ...|           Altus|   OK| 9873|        Herring Bank|   31-Jul-09|
|Valley Capital Ba...|            Mesa|   AZ|58399|Enterprise Bank &...|   11-Dec-09|
|Michigan Heritage...|Farmington Hills|   MI|34369|      Level One Bank|   24-Apr-09|
|Columbia Savings ...|      Cincinnati|   OH|32284|United Fidelity B...|   23-May-14|
|       Fidelity Bank|        Dearborn|   MI|33883|The Huntington Na...|   30-Mar-12|
|The Park Avenue Bank|        Valdosta|   GA|19797|  B

## Add Columns

In [16]:
df2 = df.withColumn('state', col('ST'))
df2.show(5)

+--------------------+--------+---+-----+---------------------+------------+-----+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|state|
+--------------------+--------+---+-----+---------------------+------------+-----+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|
|        Venture Bank|   Lacey| WA|22868| First-Citizens Ba...|   11-Sep-09|   WA|
|First State Bank ...|   Altus| OK| 9873|         Herring Bank|   31-Jul-09|   OK|
|Valley Capital Ba...|    Mesa| AZ|58399| Enterprise Bank &...|   11-Dec-09|   AZ|
+--------------------+--------+---+-----+---------------------+------------+-----+
only showing top 5 rows



## Add constant column

In [17]:
df2 = df2.withColumn('country', lit('US'))
df2.show()

+--------------------+----------------+---+-----+---------------------+------------+-----+-------+
|           Bank Name|            City| ST| CERT|Acquiring Institution|Closing Date|state|country|
+--------------------+----------------+---+-----+---------------------+------------+-----+-------+
| First Bank of Idaho|         Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|   ID|     US|
|Amcore Bank, Nati...|        Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|   IL|     US|
|        Venture Bank|           Lacey| WA|22868| First-Citizens Ba...|   11-Sep-09|   WA|     US|
|First State Bank ...|           Altus| OK| 9873|         Herring Bank|   31-Jul-09|   OK|     US|
|Valley Capital Ba...|            Mesa| AZ|58399| Enterprise Bank &...|   11-Dec-09|   AZ|     US|
|Michigan Heritage...|Farmington Hills| MI|34369|       Level One Bank|   24-Apr-09|   MI|     US|
|Columbia Savings ...|      Cincinnati| OH|32284| United Fidelity B...|   23-May-14|   OH|     US|
|       Fi

## Drop columns

In [18]:
df2 = df2.drop('CERT')
df2.show(5)

+--------------------+--------+---+---------------------+------------+-----+-------+
|           Bank Name|    City| ST|Acquiring Institution|Closing Date|state|country|
+--------------------+--------+---+---------------------+------------+-----+-------+
| First Bank of Idaho| Ketchum| ID|      U.S. Bank, N.A.|   24-Apr-09|   ID|     US|
|Amcore Bank, Nati...|Rockford| IL|          Harris N.A.|   23-Apr-10|   IL|     US|
|        Venture Bank|   Lacey| WA| First-Citizens Ba...|   11-Sep-09|   WA|     US|
|First State Bank ...|   Altus| OK|         Herring Bank|   31-Jul-09|   OK|     US|
|Valley Capital Ba...|    Mesa| AZ| Enterprise Bank &...|   11-Dec-09|   AZ|     US|
+--------------------+--------+---+---------------------+------------+-----+-------+
only showing top 5 rows



## Drop multiple columns

In [19]:
df2 = df2.drop(*['CERT', 'ST'])
df2.show(5)

+--------------------+--------+---------------------+------------+-----+-------+
|           Bank Name|    City|Acquiring Institution|Closing Date|state|country|
+--------------------+--------+---------------------+------------+-----+-------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|   ID|     US|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|   IL|     US|
|        Venture Bank|   Lacey| First-Citizens Ba...|   11-Sep-09|   WA|     US|
|First State Bank ...|   Altus|         Herring Bank|   31-Jul-09|   OK|     US|
|Valley Capital Ba...|    Mesa| Enterprise Bank &...|   11-Dec-09|   AZ|     US|
+--------------------+--------+---------------------+------------+-----+-------+
only showing top 5 rows



In [20]:
df2 = reduce(DataFrame.drop, ['CERT', 'ST'], df)
df2.show(5)

+--------------------+--------+---------------------+------------+
|           Bank Name|    City|Acquiring Institution|Closing Date|
+--------------------+--------+---------------------+------------+
| First Bank of Idaho| Ketchum|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford|          Harris N.A.|   23-Apr-10|
|        Venture Bank|   Lacey| First-Citizens Ba...|   11-Sep-09|
|First State Bank ...|   Altus|         Herring Bank|   31-Jul-09|
|Valley Capital Ba...|    Mesa| Enterprise Bank &...|   11-Dec-09|
+--------------------+--------+---------------------+------------+
only showing top 5 rows



## Filter Data

In [21]:
# equal to values
df2 = df.where(df['ST'] == 'NE')

# between values
df3 = df.where(df['CERT'].between('1000', '2000'))

# is inside multiple values
df4 = df.where(df['ST'].isin('NE', 'IL'))

print('df.count: ', df.count())
print('df2.count: ', df2.count())
print('df3.count: ', df3.count())
print('df4.count: ', df4.count())

df.count:  561
df2.count:  4
df3.count:  9
df4.count:  73


## Filter data using logical operators

In [22]:
df2 = df.where((df['ST'] == 'NE') & (df['City'] == 'Ericson'))
df2.show()

+------------------+-------+---+-----+---------------------+------------+
|         Bank Name|   City| ST| CERT|Acquiring Institution|Closing Date|
+------------------+-------+---+-----+---------------------+------------+
|Ericson State Bank|Ericson| NE|18265| Farmers and Merch...|   14-Feb-20|
+------------------+-------+---+-----+---------------------+------------+



## Replace values in DataFrame

In [23]:
# pre replace
df.show(2)
# pos replace
print('replace 7 in the above dataframe with 17 at all instances')
df.na.replace(7, 17).show(2)

+--------------------+--------+---+-----+---------------------+------------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+------------+
only showing top 2 rows

replace 7 in the above dataframe with 17 at all instances
+--------------------+--------+---+-----+---------------------+------------+
|           Bank Name|    City| ST| CERT|Acquiring Institution|Closing Date|
+--------------------+--------+---+-----+---------------------+------------+
| First Bank of Idaho| Ketchum| ID|34396|      U.S. Bank, N.A.|   24-Apr-09|
|Amcore Bank, Nati...|Rockford| IL| 3735|          Harris N.A.|   23-Apr-10|
+--------------------+--------+---+-----+---------------------+-------

## Mão na massa

In [24]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession .builder.appName("Our First Spark Example").getOrCreate()

spark

'sudo' n�o � reconhecido como um comando interno
ou externo, um programa oper�vel ou um arquivo em lotes.
O sistema n�o pode encontrar o caminho especificado.
'wget' n�o � reconhecido como um comando interno
ou externo, um programa oper�vel ou um arquivo em lotes.
tar: Error opening archive: Failed to open 'spark-3.2.1-bin-hadoop3.2.tgz'




In [25]:
import requests
path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
req = requests.get(path)
url_content = req.content

In [26]:
csv_file_name = 'owid-covid-data.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df = spark.read.csv(csv_file_name, header=True, inferSchema=True)

In [27]:
#Viewing the dataframe schema 
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: integer (nullable = true)
 |-- new_cases: integer (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- new_deaths: integer (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: integer (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: integer (nullable = true)
 |-- hosp_patients_per_mil

In [28]:
#Converting a date column
df.select(F.to_date(df.date).alias('date'))

DataFrame[date: date]

In [29]:
#Summary stats
df.describe().show()

+-------+--------+-------------+-----------+------------------+------------------+------------------+-----------------+------------------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+------------------+------------------+------------------------+------------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-------------------+------------------+------------------------+----------------------+------------------+-------------------------------+-------------------+-----------------+-------------+--------------------+-------------------+-----------------------+--------------------+------------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+------------------

In [30]:
#Simple Group by Function
df.groupBy("location").sum("new_cases").orderBy(F.desc("sum(new_cases)")).show(truncate=False)

+-----------------------------+--------------+
|location                     |sum(new_cases)|
+-----------------------------+--------------+
|World                        |775800417     |
|High-income countries        |428935387     |
|Asia                         |301554088     |
|Europe                       |252798408     |
|Upper-middle-income countries|251736497     |
|European Union (27)          |185732052     |
|North America                |124485384     |
|United States                |103436829     |
|China                        |99369029      |
|Lower-middle-income countries|92013076      |
|South America                |68820742      |
|India                        |45041192      |
|France                       |38997490      |
|Germany                      |38437756      |
|Brazil                       |37511921      |
|South Korea                  |34571873      |
|Japan                        |33803572      |
|Italy                        |26727644      |
|United Kingd