# Extract Information PySpark




## *Instructor: José David Arévalo*

- email: <jdaarevalo@gmail.com>
- twitter: [@jdaarevalo](https://twitter.com/jdaarevalo)
- linkedin: [jdavidarevalo](https://www.linkedin.com/in/jdavidarevalo/)
- github: [jdaarevalo](https://github.com/jdaarevalo)



###### Octubre 05, 2019


In [1]:
# Importamos los modulos necesarios
import pyspark  #python package

In [2]:
# si no logra hacer el import de pyspark, sera necesario instalarlo
# usar !pip install pyspark

In [3]:
# objeto principal o la base a partir de la cual cuelga toda la funcionalidad de Apache Spark
from pyspark.sql import SparkSession

# Se trata del context básico de Spark, desde donde se crean el resto de variables 
# que maneja el framework. Sólo un SparkContext puede estar activo por JVM.
from pyspark import SparkContext

# el objeto functions del modulo sql define las funciones estándar incorporadas 
# para trabajar con (valores producidos) columnas.
import pyspark.sql.functions as func

In [4]:
# create a new spark session, que sera la base para nuestra aplicacion

spark = SparkSession.builder\
                    .appName("Test")\
                    .getOrCreate()

#spark sera el punto de entrada para la aplicacion

In [5]:
spark

In [7]:
ls data/

[31mtrain.csv[m[m*


# Read csv

In [9]:
df = spark.read.format("csv").option("header", "true").load("data/train.csv")

In [10]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("data/train.csv")

In [12]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [13]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

# Read table postgres

In [23]:
# create a new spark session, que sera la base para nuestra aplicacion

spark = SparkSession.builder\
                    .appName("Test")\
                    .config("spark.jars.packages", "org.postgresql:postgresql:9.4.1211")\
                    .getOrCreate()

#spark sera el punto de entrada para la aplicacion
# el package se pude descargar de https://jdbc.postgresql.org/download.html

In [24]:
# variables de conexion a la base de datos
jdbcPort = 5432                                                                           

jdbcHostname = "ec2-54-227-251-33.compute-1.amazonaws.com"
jdbcDatabase = "d2v7jfobmmd2o7"                                       
jdbcUsername= "sjkkapmzuixjbz"                                                              
jdbcPassword= "2995eb07a249a61fcf4035f967d57979550b77c6756929410db3b37c50de7a5e"
# jdbc url para conectarse a la base de datos
jdbcUrl = "jdbc:postgresql://{0}:{1}/{2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

# propiedades para la conexion a la base de datos
connectionProperties = {                                                                  
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver": 'org.postgresql.Driver',
  "ssl":'true',
  "sslfactory":'org.postgresql.ssl.NonValidatingFactory',
  "stringtype": "unspecified"                                                             
}

In [25]:
query="""(SELECT * FROM pg_catalog.pg_tables) AS data_table"""

In [26]:
df_tables = spark.read.jdbc(url=jdbcUrl,
                              table=query,
                              properties=connectionProperties)

In [27]:
df_tables.show()

+----------+----------------+--------------+----------+----------+--------+-----------+-----------+
|schemaname|       tablename|    tableowner|tablespace|hasindexes|hasrules|hastriggers|rowsecurity|
+----------+----------------+--------------+----------+----------+--------+-----------+-----------+
|pg_catalog|    pg_statistic|      postgres|      null|      true|   false|      false|      false|
|    public|        category|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|pg_catalog|pg_foreign_table|      postgres|      null|      true|   false|      false|      false|
|    public|            city|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|           event|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|           group|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|pg_catalog|       pg_authid|      postgres| pg_global|      true|   false|      false|      false|


In [28]:
query="""(SELECT * FROM pg_catalog.pg_tables where tableowner = 'sjkkapmzuixjbz') AS data_table"""

In [29]:
df_tables = spark.read.jdbc(url=jdbcUrl,
                              table=query,
                              properties=connectionProperties)

In [32]:
df_tables.show()

+----------+------------+--------------+----------+----------+--------+-----------+-----------+
|schemaname|   tablename|    tableowner|tablespace|hasindexes|hasrules|hastriggers|rowsecurity|
+----------+------------+--------------+----------+----------+--------+-----------+-----------+
|    public|    category|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|        city|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|       event|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|       group|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public| group_topic|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|      member|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|member_topic|sjkkapmzuixjbz|      null|     false|   false|      false|      false|
|    public|       topic|sjkkapmzuixjbz|