# Extract Information PySpark




## *Instructor: José David Arévalo*

- email: <jdaarevalo@gmail.com>
- twitter: [@jdaarevalo](https://twitter.com/jdaarevalo)
- linkedin: [jdavidarevalo](https://www.linkedin.com/in/jdavidarevalo/)
- github: [jdaarevalo](https://github.com/jdaarevalo)



###### Febrero, 2020


In [1]:
# Importamos los modulos necesarios
# import pyspark  #python package

import findspark
findspark.init()

import pyspark

In [2]:
pyspark

<module 'pyspark' from '/usr/local/spark/python/pyspark/__init__.py'>

In [3]:
# si no logra hacer el import de pyspark, sera necesario instalarlo
# usar !pip install pyspark

In [56]:
# objeto principal o la base a partir de la cual cuelga toda la funcionalidad de Apache Spark
from pyspark.sql import SparkSession

# Se trata del context básico de Spark, desde donde se crean el resto de variables 
# que maneja el framework. Sólo un SparkContext puede estar activo por JVM.
from pyspark import SparkContext

# el objeto functions del modulo sql define las funciones estándar incorporadas 
# para trabajar con (valores producidos) columnas.
import pyspark.sql.functions as func

from datetime import datetime

In [5]:
# create a new spark session, que sera la base para nuestra aplicacion

spark = SparkSession.builder\
                    .appName("Test")\
                    .getOrCreate()

#spark sera el punto de entrada para la aplicacion

In [6]:
spark

In [35]:
ls data/

IBRD_Statement_Of_Loans_-_Historical_Data.csv
[31mtrain.csv[m[m*


# Read csv

In [8]:
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

In [9]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [10]:
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("data/train.csv")

In [11]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [12]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

### url to dowload the data
https://finances.worldbank.org/Loans-and-Credits/IBRD-Statement-Of-Loans-Historical-Data/zucq-nrc3

In [37]:
df_big = spark.read\
            .format("csv")\
            .option("header", "true")\
            .load("data/IBRD_Statement_Of_Loans_-_Historical_Data.csv")

# Basic Spark Methods

In [38]:
#we can use the columns attribute just like with pandas
df_big.columns

['End of Period',
 'Loan Number',
 'Region',
 'Country Code',
 'Country',
 'Borrower',
 'Guarantor Country Code',
 'Guarantor',
 'Loan Type',
 'Loan Status',
 'Interest Rate',
 'Currency of Commitment',
 'Project ID',
 'Project Name ',
 'Original Principal Amount',
 'Cancelled Amount',
 'Undisbursed Amount',
 'Disbursed Amount',
 'Repaid to IBRD',
 'Due to IBRD',
 'Exchange Adjustment',
 "Borrower's Obligation",
 'Sold 3rd Party',
 'Repaid 3rd Party',
 'Due 3rd Party',
 'Loans Held',
 'First Repayment Date',
 'Last Repayment Date',
 'Agreement Signing Date',
 'Board Approval Date',
 'Effective Date (Most Recent)',
 'Closed Date (Most Recent)',
 'Last Disbursement Date']

In [39]:
# number of rows using the .count()
df_big.count()

872665

In [40]:
#show first 5 rows
df_big.show(5)

+--------------------+-----------+--------------------+------------+---------+--------------------+----------------------+---------+---------+-----------+-------------+----------------------+----------+--------------------+-------------------------+----------------+------------------+----------------+--------------+-----------+-------------------+---------------------+--------------+----------------+-------------+----------+--------------------+--------------------+----------------------+--------------------+----------------------------+-------------------------+----------------------+
|       End of Period|Loan Number|              Region|Country Code|  Country|            Borrower|Guarantor Country Code|Guarantor|Loan Type|Loan Status|Interest Rate|Currency of Commitment|Project ID|       Project Name |Original Principal Amount|Cancelled Amount|Undisbursed Amount|Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|Sold 3rd Party|Repaid 3rd Party|Due 

In [42]:
#show first row
df_big.head()

Row(End of Period='01/31/2015 12:00:00 AM', Loan Number='IBRD71750', Region='Latin America and Caribbean', Country Code='GT', Country='Guatemala', Borrower='Ministerio de Finanzas P?blicas', Guarantor Country Code='GT', Guarantor='Guatemala', Loan Type='FSL', Loan Status='Terminated', Interest Rate='0', Currency of Commitment=None, Project ID='P064883', Project Name ='GT WESTERN ALTIPLANO NRM', Original Principal Amount='32800000', Cancelled Amount='32800000', Undisbursed Amount='0', Disbursed Amount='0', Repaid to IBRD='0', Due to IBRD='0', Exchange Adjustment='0', Borrower's Obligation='0', Sold 3rd Party='0', Repaid 3rd Party='0', Due 3rd Party='0', Loans Held='0', First Repayment Date=None, Last Repayment Date=None, Agreement Signing Date=None, Board Approval Date='05/27/2003 12:00:00 AM', Effective Date (Most Recent)='05/27/2003 12:00:00 AM', Closed Date (Most Recent)='12/23/2004 12:00:00 AM', Last Disbursement Date=None)

In [43]:
type(df_big.head())

pyspark.sql.types.Row

In [44]:
df_big.describe()

DataFrame[summary: string, End of Period: string, Loan Number: string, Region: string, Country Code: string, Country: string, Borrower: string, Guarantor Country Code: string, Guarantor: string, Loan Type: string, Loan Status: string, Interest Rate: string, Currency of Commitment: string, Project ID: string, Project Name : string, Original Principal Amount: string, Cancelled Amount: string, Undisbursed Amount: string, Disbursed Amount: string, Repaid to IBRD: string, Due to IBRD: string, Exchange Adjustment: string, Borrower's Obligation: string, Sold 3rd Party: string, Repaid 3rd Party: string, Due 3rd Party: string, Loans Held: string, First Repayment Date: string, Last Repayment Date: string, Agreement Signing Date: string, Board Approval Date: string, Effective Date (Most Recent): string, Closed Date (Most Recent): string, Last Disbursement Date: string]

In [45]:
df_big.describe().show()

+-------+--------------------+-----------+----------+------------+--------+--------------------+----------------------+---------+----------+-----------+------------------+----------------------+-----------------+------------------+-------------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+---------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+----------------------------+-------------------------+----------------------+
|summary|       End of Period|Loan Number|    Region|Country Code| Country|            Borrower|Guarantor Country Code|Guarantor| Loan Type|Loan Status|     Interest Rate|Currency of Commitment|       Project ID|     Project Name |Original Principal Amount|   Cancelled Amount| Undisbursed Amount|    Disbursed Amount|     Repaid to IBRD|        Due to IB

In [79]:
pdf_describe = df_big.describe().toPandas()

In [80]:
pdf_describe

Unnamed: 0,summary,End of Period,Loan Number,Region,Country Code,Country,Borrower,Guarantor Country Code,Guarantor,Loan Type,...,Repaid 3rd Party,Due 3rd Party,Loans Held,First Repayment Date,Last Repayment Date,Agreement Signing Date,Board Approval Date,Effective Date (Most Recent),Closed Date (Most Recent),Last Disbursement Date
0,count,872665,872665,872665,872665,872665,866652,838882,811286,872617,...,872617.0,872617,872617.0,868381.0,868485,861492,872569,867346,871759,490437
1,mean,,,,,,0.6070833333333335,,,,...,443247.1273399795,0.0,27301218.72295944,53579.66933333334,,,,,,
2,stddev,,,,,,0.4057142812549054,,,,...,3871219.5476926216,0.0,114494029.3696152,273815.10520057834,,,,,,
3,min,01/31/2012 12:00:00 AM,GFMDR,AFRICA,1W,Repaid,#MULTIVALUE,1W,Albania,\tIFC LOAN,...,0.0,0,-0.01,0.0,01/01/1934 12:00:00 AM,01/01/2008 12:00:00 AM,01/01/2008 12:00:00 AM,01/01/2007 12:00:00 AM,01/01/1950 12:00:00 AM,01/01/1998 12:00:00 AM
4,max,"ka banka d.d.""",IBRDS0200,South Asia,ZW,Zimbabwe,Zimbabwe Electricity Supply Commission,ZW,jordan,SNGL CRNCY,...,997000.0,11/18/1997 12:00:00 AM,9999515.72,2045864.8,12/15/2048 12:00:00 AM,12/31/2008 12:00:00 AM,12/30/2013 12:00:00 AM,12/31/2019 12:00:00 AM,12/31/2033 12:00:00 AM,12/31/2019 12:00:00 AM


In [82]:
pdf_describe["Interest Rate"]

0                845357
1    216.74088387326998
2    28138.860389781774
3                     0
4                  9.97
Name: Interest Rate, dtype: object

In [77]:
df_big.groupBy("Country").count().show()

+-------------+-----+
|      Country|count|
+-------------+-----+
|         Chad|  104|
|     Paraguay| 6026|
|        World|33907|
|      Senegal| 2414|
|   Cabo Verde|   78|
|Taiwan, China| 1470|
|       Guyana| 1260|
|  Philippines|27462|
|     Malaysia|13319|
|    Singapore| 1470|
|         Fiji| 1526|
|       Turkey|30820|
|       Malawi| 1050|
|         Iraq| 1088|
|       Jordan| 8571|
|        Sudan|  840|
|       France|  106|
|       Greece| 1785|
|       Kosovo|   93|
|    Sri Lanka| 1492|
+-------------+-----+
only showing top 20 rows



In [83]:
df_big.filter(df_big['Interest Rate'] > 216.74).show()

+--------------+-----------+-------+------------+--------+--------+----------------------+---------+-------------------+-----------+-------------+----------------------+----------+-------------+-------------------------+----------------+------------------+----------------+--------------+-----------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------------+----------------------------+-------------------------+----------------------+
| End of Period|Loan Number| Region|Country Code| Country|Borrower|Guarantor Country Code|Guarantor|          Loan Type|Loan Status|Interest Rate|Currency of Commitment|Project ID|Project Name |Original Principal Amount|Cancelled Amount|Undisbursed Amount|Disbursed Amount|Repaid to IBRD|Due to IBRD|Exchange Adjustment|Borrower's Obligation|      Sold 3rd Party|    Repaid 3rd Party|       Due 3rd Party|     

In [85]:
df_big.select("Country", "Interest Rate").show()

+-----------+-------------+
|    Country|Interest Rate|
+-----------+-------------+
|  Guatemala|            0|
|      World|         5.56|
|      World|         5.58|
|      World|            0|
|      World|        10.65|
|      World|        10.65|
|     Brazil|            0|
| Montenegro|            0|
|Philippines|            0|
|     Mexico|            0|
|  Indonesia|            0|
|  Sri Lanka|            0|
|    Tunisia|            0|
|    Tunisia|            0|
| Kazakhstan|            0|
|    Romania|            0|
|    Morocco|            0|
|    Croatia|            0|
|    Croatia|            0|
|    Moldova|            0|
+-----------+-------------+
only showing top 20 rows



In [111]:
df_big.select(func.col("Country"), func.col("Interest Rate")).show()

+-----------+-------------+
|    Country|Interest Rate|
+-----------+-------------+
|  Guatemala|            0|
|      World|         5.56|
|      World|         5.58|
|      World|            0|
|      World|        10.65|
|      World|        10.65|
|     Brazil|            0|
| Montenegro|            0|
|Philippines|            0|
|     Mexico|            0|
|  Indonesia|            0|
|  Sri Lanka|            0|
|    Tunisia|            0|
|    Tunisia|            0|
| Kazakhstan|            0|
|    Romania|            0|
|    Morocco|            0|
|    Croatia|            0|
|    Croatia|            0|
|    Moldova|            0|
+-----------+-------------+
only showing top 20 rows



In [87]:
df_big.select(df_big["Country"], df_big["Interest Rate"]).show()

+-----------+-------------+
|    Country|Interest Rate|
+-----------+-------------+
|  Guatemala|            0|
|      World|         5.56|
|      World|         5.58|
|      World|            0|
|      World|        10.65|
|      World|        10.65|
|     Brazil|            0|
| Montenegro|            0|
|Philippines|            0|
|     Mexico|            0|
|  Indonesia|            0|
|  Sri Lanka|            0|
|    Tunisia|            0|
|    Tunisia|            0|
| Kazakhstan|            0|
|    Romania|            0|
|    Morocco|            0|
|    Croatia|            0|
|    Croatia|            0|
|    Moldova|            0|
+-----------+-------------+
only showing top 20 rows



In [88]:
df_big.select(df_big["Country"], df_big["Interest Rate"]+1).show()

+-----------+-------------------+
|    Country|(Interest Rate + 1)|
+-----------+-------------------+
|  Guatemala|                1.0|
|      World|               6.56|
|      World|               6.58|
|      World|                1.0|
|      World|              11.65|
|      World|              11.65|
|     Brazil|                1.0|
| Montenegro|                1.0|
|Philippines|                1.0|
|     Mexico|                1.0|
|  Indonesia|                1.0|
|  Sri Lanka|                1.0|
|    Tunisia|                1.0|
|    Tunisia|                1.0|
| Kazakhstan|                1.0|
|    Romania|                1.0|
|    Morocco|                1.0|
|    Croatia|                1.0|
|    Croatia|                1.0|
|    Moldova|                1.0|
+-----------+-------------------+
only showing top 20 rows



In [201]:
df_big.groupBy("Country").agg(func.max("Interest Rate").alias("max"), func.sum("Interest Rate")).show()

+-------------+----+------------------+
|      Country| max|sum(Interest Rate)|
+-------------+----+------------------+
|         Chad|   3| 283.2600000000001|
|     Paraguay| 9.6| 24698.88499999993|
|        World|9.97|257384.97000000306|
|      Senegal| 9.6| 19480.09999999997|
|   Cabo Verde|   0|               0.0|
|Taiwan, China|7.25|           8665.25|
|       Guyana|9.25|  9626.80000000001|
|  Philippines| 9.6|143551.30999999962|
|     Malaysia| 9.6|  90615.4899999999|
|    Singapore|   9|            9572.0|
|         Fiji|9.25| 9278.989999999994|
|       Turkey| 9.6|143272.31499999948|
|       Malawi| 9.6| 7488.299999999984|
|         Iraq|7.25|           4030.75|
|       Jordan| 9.6|28721.160000000094|
|        Sudan| 8.2| 5010.615000000002|
|       France|4.25|            445.25|
|       Greece|   9|12926.449999999992|
|       Kosovo|   0|               0.0|
|    Sri Lanka|7.22| 7600.165000000001|
+-------------+----+------------------+
only showing top 20 rows



## The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame.



In [104]:
df_big.select("Country", "Interest Rate").createOrReplaceTempView("country_interest_rate")

In [108]:
spark.sql("""SELECT *
          FROM country_interest_rate
          """).show()


+-----------+-------------+
|    Country|Interest Rate|
+-----------+-------------+
|  Guatemala|            0|
|      World|         5.56|
|      World|         5.58|
|      World|            0|
|      World|        10.65|
|      World|        10.65|
|     Brazil|            0|
| Montenegro|            0|
|Philippines|            0|
|     Mexico|            0|
|  Indonesia|            0|
|  Sri Lanka|            0|
|    Tunisia|            0|
|    Tunisia|            0|
| Kazakhstan|            0|
|    Romania|            0|
|    Morocco|            0|
|    Croatia|            0|
|    Croatia|            0|
|    Moldova|            0|
+-----------+-------------+
only showing top 20 rows



In [117]:
df_big.select(func.col("Country"), func.col("Interest Rate").alias("interest_rate"))\
    .createOrReplaceTempView("country_interest_rate")

In [120]:
df_country_irate = spark.sql("""SELECT *
                      FROM country_interest_rate
                      WHERE interest_rate = 0
                      """)

# Parquet

## Save/Load a parquet

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

In [130]:
date_file = datetime.now().strftime("%m-%d--%H-%S-%s")

path_to_parquet = "output_data/df/{}".format(date_file)

In [131]:
df_country_irate.write.parquet(path_to_parquet)

In [132]:
spark.read.load(path_to_parquet).show()

+--------------------+-------------+
|             Country|interest_rate|
+--------------------+-------------+
|             Morocco|         0.69|
|           Indonesia|         0.93|
|            Bulgaria|         0.38|
|Macedonia, former...|            0|
|           Indonesia|         0.93|
|               Chile|            0|
|             Morocco|         0.39|
|           Indonesia|         0.87|
|           Indonesia|         0.87|
|             Croatia|         0.39|
|             Romania|         0.39|
|             Morocco|         0.39|
|            Bulgaria|         0.38|
|            Bulgaria|         0.38|
|            Bulgaria|         0.38|
|             Morocco|         0.39|
|             Morocco|         0.39|
|            Bulgaria|         0.38|
|           Indonesia|         0.87|
|           Indonesia|         0.87|
+--------------------+-------------+
only showing top 20 rows



## Run SQL on files directly

In [133]:
path_to_parquet

'output_data/df/02-08--14-30-1581189090'

In [154]:
df_parquet = spark.sql("""
                        SELECT
                            max(interest_rate) AS max_interest_rate
                            , min(interest_rate) AS min_interest_rate
                            , avg(interest_rate) AS avg_interest_rate
                            , Country
                        FROM parquet.`{}`
                        --WHERE interest_rate > 0.9
                        GROUP BY Country
                        """.format(path_to_parquet))

In [155]:
df_parquet.show()

+-----------------+-----------------+--------------------+------------------+
|max_interest_rate|min_interest_rate|   avg_interest_rate|           Country|
+-----------------+-----------------+--------------------+------------------+
|             0.94|                0| 0.03667334669338678|          Paraguay|
|             0.69|                0|0.006888185654008...|             World|
|                0|                0|                 0.0|        Cabo Verde|
|                0|                0|                 0.0|     Taiwan, China|
|             0.99|                0| 0.20529373789541647|       Philippines|
|                0|                0|                 0.0|          Malaysia|
|                0|                0|                 0.0|              Fiji|
|             0.99|                0|  0.1428481984642647|            Turkey|
|                0|                0|                 0.0|              Iraq|
|             0.99|                0|  0.2613406380921692|      

In [158]:
df_country_irate.count()

228358

In [166]:
path_to_parquet_by_country = "output_data/namesPartByCountry.parquet"

In [161]:
df_country_irate.write.partitionBy("Country").format("parquet").save(path_to_parquet_by_country)

In [184]:
ls "output_data/"

[34mdf[m[m/                         [34mnamesPartByCountry.parquet[m[m/


In [192]:
%%time
spark.sql("""
            SELECT
                sum(*)
            FROM parquet.`{}/Country=Ch*`
            """.format(path_to_parquet_by_country)).show()

+----------------------------------+
|sum(CAST(interest_rate AS DOUBLE))|
+----------------------------------+
|                 5210.539999999902|
+----------------------------------+

CPU times: user 1.21 ms, sys: 1.31 ms, total: 2.53 ms
Wall time: 195 ms


# Querying the data

# Read table postgres

In [13]:
# create a new spark session, que sera la base para nuestra aplicacion

spark = SparkSession.builder\
                    .appName("Test")\
                    .config("spark.jars.packages", "org.postgresql:postgresql:9.4.1211")\
                    .getOrCreate()

#spark sera el punto de entrada para la aplicacion
# el package se pude descargar de https://jdbc.postgresql.org/download.html

In [14]:
# variables de conexion a la base de datos
jdbcPort = 5432                                                                           

jdbcHostname = "YOUR_HOST"
jdbcDatabase = "YOUR_DB_NAME"                                       
jdbcUsername= "YOUR_USERNAME"                                                              
jdbcPassword= "YOUR_PASS"
# jdbc url para conectarse a la base de datos
jdbcUrl = "jdbc:postgresql://{0}:{1}/{2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

# propiedades para la conexion a la base de datos
connectionProperties = {                                                                  
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver": 'org.postgresql.Driver',
  "ssl":'true',
  "sslfactory":'org.postgresql.ssl.NonValidatingFactory',
  "stringtype": "unspecified"                                                             
}

In [15]:
query="""(SELECT * FROM pg_catalog.pg_tables) AS data_table"""

In [None]:
df_tables = spark.read.jdbc(url=jdbcUrl,
                              table=query,
                              properties=connectionProperties)

In [None]:
df_tables.show()

In [None]:
query="""(SELECT * FROM pg_catalog.pg_tables where tableowner = 'sjkkapmzuixjbz') AS data_table"""

In [None]:
df_tables = spark.read.jdbc(url=jdbcUrl,
                              table=query,
                              properties=connectionProperties)

In [None]:
df_tables.show()