# EDA Covid

In [2]:
# !curl -o /tmp/postgresql-42.1.4.jar https://jdbc.postgresql.org/download/postgresql-42.1.4.jar

In [3]:
# !pyspark --driver-class-path /tmp/postgresql-42.1.4.jar --jars /tmp/postgresql-42.1.4.jar

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import DataFrameReader, SQLContext, SparkSession

import os

## Mode 1

In [3]:
spark = SparkSession \
    .builder \
    .appName("DataSUS") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.2.22.jar") \
    .master('spark://wilian-ubuntu:7077') \
    .getOrCreate()

In [4]:
spark.sql("SHOW TABLES")

DataFrame[database: string, tableName: string, isTemporary: boolean]

In [8]:
df = spark.read.format("jdbc"). \
options(
         url='jdbc:postgresql://localhost:5432/datasus', # jdbc:postgresql://<host>:<port>/<database>
         dbtable='sia_pa_rs',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver',
        #partitionColumn='index',
        ).\
load()

In [9]:
df.printSchema()

root
 |-- index: long (nullable = true)
 |-- pa_gestao: long (nullable = true)
 |-- pa_condic: string (nullable = true)
 |-- pa_ufmun: long (nullable = true)
 |-- pa_regct: long (nullable = true)
 |-- pa_incout: long (nullable = true)
 |-- pa_incurg: long (nullable = true)
 |-- pa_tpups: long (nullable = true)
 |-- pa_tippre: long (nullable = true)
 |-- pa_mn_ind: string (nullable = true)
 |-- pa_cnpjcpf: long (nullable = true)
 |-- pa_cnpjmnt: long (nullable = true)
 |-- pa_cnpj_cc: long (nullable = true)
 |-- pa_mvm: long (nullable = true)
 |-- pa_cmp: long (nullable = true)
 |-- pa_proc_id: long (nullable = true)
 |-- pa_tpfin: long (nullable = true)
 |-- pa_subfin: long (nullable = true)
 |-- pa_nivcpl: long (nullable = true)
 |-- pa_docorig: string (nullable = true)
 |-- pa_autoriz: string (nullable = true)
 |-- pa_cnsmed: long (nullable = true)
 |-- pa_cbocod: string (nullable = true)
 |-- pa_motsai: long (nullable = true)
 |-- pa_obito: long (nullable = true)
 |-- pa_encerr: lon

In [12]:
# df.sample(.5).show(5)

In [7]:
# spark.sql('select count(1) from datasus.sia_pa_rs').show()

In [None]:
# df.count()

In [None]:
df.select("*").toPandas()

In [None]:
df.select('*').collect()

In [None]:
# df.show(1)
# df.take(1)


In [6]:
df.createOrReplaceTempView("tableA")

In [7]:
teenagerDF = spark.sql("SELECT min(PA_IDADE) FROM tableA WHERE PA_CIDPRI = 'H57'")
teenagerDF.show()

+-------------+
|min(PA_IDADE)|
+-------------+
|            0|
+-------------+



In [8]:
teenagerDF = spark.sql("SELECT max(PA_IDADE) FROM tableA WHERE PA_CIDPRI = 'H57'")
teenagerDF.show()

+-------------+
|max(PA_IDADE)|
+-------------+
|           76|
+-------------+



In [None]:
# spark.sql("SELECT count(*) from tableA").show()

## Mode 2

In [2]:
# sparkClassPath = os.getenv('SPARK_CLASSPATH', '/opt/spark/jars/postgresql-42.2.22.jar')

# # Populate configuration
# conf = SparkConf()
# conf.setAppName('DataSUSCtx')
# conf.set('spark.jars', 'file:%s' % sparkClassPath)
# conf.set('spark.executor.extraClassPath', sparkClassPath)
# conf.set('spark.driver.extraClassPath', sparkClassPath)

# conf.set('spark.driver.cores', '1')
# conf.set('spark.executor.cores', '1')
# conf.set('spark.driver.memory', '4G')
# conf.set('spark.executor.memory', '4G')

# conf.set('spark.master', 'spark://wilian-ubuntu:7077')

<pyspark.conf.SparkConf at 0x7ff3a7064898>

In [3]:
# spark = SparkSession.builder.getOrCreate()

In [3]:
# sc = SparkContext(conf=conf)

# sqlContext = SQLContext(sc)

In [2]:
spark = SparkSession \
    .builder \
    .appName("DataSUS") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.2.22.jar") \
    .master('spark://wilian-ubuntu:7077') \
    .getOrCreate()

In [5]:
sc = spark.sparkContext

sqlContext=SQLContext(sc)

In [6]:
spark.sql("SHOW TABLES")

DataFrame[database: string, tableName: string, isTemporary: boolean]

In [9]:
url = 'postgresql://127.0.0.1:5432/datasus'
properties = {'user':'postgres', 'password':'postgres', 'driver':'org.postgresql.Driver'}

In [10]:
df = DataFrameReader(sqlContext).jdbc(url='jdbc:%s' % url, table='sia_pa_rs', properties=properties)

In [11]:
df.printSchema()

root
 |-- index: long (nullable = true)
 |-- pa_gestao: long (nullable = true)
 |-- pa_condic: string (nullable = true)
 |-- pa_ufmun: long (nullable = true)
 |-- pa_regct: long (nullable = true)
 |-- pa_incout: long (nullable = true)
 |-- pa_incurg: long (nullable = true)
 |-- pa_tpups: long (nullable = true)
 |-- pa_tippre: long (nullable = true)
 |-- pa_mn_ind: string (nullable = true)
 |-- pa_cnpjcpf: long (nullable = true)
 |-- pa_cnpjmnt: long (nullable = true)
 |-- pa_cnpj_cc: long (nullable = true)
 |-- pa_mvm: long (nullable = true)
 |-- pa_cmp: long (nullable = true)
 |-- pa_proc_id: long (nullable = true)
 |-- pa_tpfin: long (nullable = true)
 |-- pa_subfin: long (nullable = true)
 |-- pa_nivcpl: long (nullable = true)
 |-- pa_docorig: string (nullable = true)
 |-- pa_autoriz: string (nullable = true)
 |-- pa_cnsmed: long (nullable = true)
 |-- pa_cbocod: string (nullable = true)
 |-- pa_motsai: long (nullable = true)
 |-- pa_obito: long (nullable = true)
 |-- pa_encerr: lon

In [12]:
df.columns

['index',
 'pa_gestao',
 'pa_condic',
 'pa_ufmun',
 'pa_regct',
 'pa_incout',
 'pa_incurg',
 'pa_tpups',
 'pa_tippre',
 'pa_mn_ind',
 'pa_cnpjcpf',
 'pa_cnpjmnt',
 'pa_cnpj_cc',
 'pa_mvm',
 'pa_cmp',
 'pa_proc_id',
 'pa_tpfin',
 'pa_subfin',
 'pa_nivcpl',
 'pa_docorig',
 'pa_autoriz',
 'pa_cnsmed',
 'pa_cbocod',
 'pa_motsai',
 'pa_obito',
 'pa_encerr',
 'pa_perman',
 'pa_alta',
 'pa_transf',
 'pa_cidpri',
 'pa_cidsec',
 'pa_cidcas',
 'pa_catend',
 'pa_idade',
 'idademin',
 'idademax',
 'pa_flidade',
 'pa_sexo',
 'pa_racacor',
 'pa_munpcn',
 'pa_qtdpro',
 'pa_qtdapr',
 'pa_valpro',
 'pa_valapr',
 'pa_ufdif',
 'pa_mndif',
 'pa_dif_val',
 'nu_vpa_tot',
 'nu_pa_tot',
 'pa_indica',
 'pa_codoco',
 'pa_flqt',
 'pa_fler',
 'pa_etnia',
 'pa_vl_cf',
 'pa_vl_cl',
 'pa_vl_inc',
 'pa_srv_c',
 'pa_ine',
 'pa_nat_jur']

In [9]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fe511a90860>


In [10]:
print(sc)
print(sqlContext)

<SparkContext master=local[*] appName=DataSUSCtx>
<pyspark.sql.context.SQLContext object at 0x7fe52af3aef0>


In [11]:
# Resilient Distributed Dataframe (RDD)

In [12]:
df.rdd

MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:0

In [13]:
df.rdd.getNumPartitions()

1

In [14]:
df = df.repartition(4)

In [15]:
df.rdd.getNumPartitions()

4

In [12]:
# df.collect()

# Py4JJavaError: An error occurred while calling o56.collectToPython.
# org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (wilian-ubuntu executor driver): java.lang.OutOfMemoryError: Java heap space
# Driver stacktrace:

In [13]:
df.where('index == 2250802').show()

+-------+---------+---------+--------+--------+---------+---------+--------+---------+---------+--------------+----------+----------+------+------+----------+--------+---------+---------+----------+-------------+---------------+---------+---------+--------+---------+---------+-------+---------+---------+---------+---------+---------+--------+--------+--------+----------+-------+----------+---------+---------+---------+---------+---------+--------+--------+----------+----------+---------+---------+---------+-------+-------+--------+--------+--------+---------+--------+------+----------+
|  index|pa_gestao|pa_condic|pa_ufmun|pa_regct|pa_incout|pa_incurg|pa_tpups|pa_tippre|pa_mn_ind|    pa_cnpjcpf|pa_cnpjmnt|pa_cnpj_cc|pa_mvm|pa_cmp|pa_proc_id|pa_tpfin|pa_subfin|pa_nivcpl|pa_docorig|   pa_autoriz|      pa_cnsmed|pa_cbocod|pa_motsai|pa_obito|pa_encerr|pa_perman|pa_alta|pa_transf|pa_cidpri|pa_cidsec|pa_cidcas|pa_catend|pa_idade|idademin|idademax|pa_flidade|pa_sexo|pa_racacor|pa_munpcn|pa_qt

In [14]:
df.select('PA_IDADE').distinct().count()

132

In [20]:
# df.limit(10).toPandas()

In [16]:
# df.select(
#     'pa_gestao',
#     'pa_ufmun',
#     'pa_cbocod',
#     'pa_cidpri',
#     'pa_cidsec',
# ).limit(10).toPandas()

DataFrame[pa_gestao: bigint, pa_ufmun: bigint, pa_cbocod: string, pa_cidpri: string, pa_cidsec: string]

In [14]:
# df.take(1)

In [8]:
df.createOrReplaceTempView("tableA")

In [17]:
teenagerDF = spark.sql("SELECT min(PA_IDADE) FROM tableA WHERE PA_CIDPRI = 'H57'")
teenagerDF.show()

+-------------+
|min(PA_IDADE)|
+-------------+
|            0|
+-------------+



In [9]:
# df.show(1, vertical=True)

In [12]:
%%time
df.select("PA_CIDPRI", "PA_CIDSEC").describe().show()

+-------+-------------------+---------+
|summary|          PA_CIDPRI|PA_CIDSEC|
+-------+-------------------+---------+
|  count|           40722508| 40722508|
|   mean|0.04845269043950959|      0.0|
| stddev|  7.336797206585033|      0.0|
|    min|               0000|        0|
|    max|               Z999|     Z940|
+-------+-------------------+---------+



In [14]:
df.select("PA_CIDPRI", "PA_CIDSEC").show()

+---------+---------+
|PA_CIDPRI|PA_CIDSEC|
+---------+---------+
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
|     0000|     0000|
+---------+---------+
only showing top 20 rows



In [15]:
df.groupby('PA_CIDPRI').count().sort('count').show()

+---------+-----+
|PA_CIDPRI|count|
+---------+-----+
|     Q201|    1|
|     P282|    1|
|     Q719|    1|
|     Y548|    1|
|     Z739|    1|
|     L505|    1|
|      T36|    1|
|     N140|    1|
|     G951|    1|
|      R71|    1|
|      B05|    1|
|     P201|    1|
|     H311|    1|
|     W318|    1|
|     O046|    1|
|      O12|    1|
|     B403|    1|
|     C922|    1|
|     T855|    1|
|     L658|    1|
+---------+-----+
only showing top 20 rows



In [8]:
df.groupby('PA_CIDSEC').count().show()

+---------+-----+
|PA_CIDSEC|count|
+---------+-----+
|     H904|  663|
|     C718|    3|
|     C491|    2|
|      Z21|    2|
|     H906| 1012|
|     C676|    2|
|     G710|   26|
|      E83|    2|
|     N110|    4|
|     M179|    1|
|      C22|   16|
|     E142|   12|
|     I071|    1|
|     C186|   15|
|      C77|  211|
|     C159|   26|
|     N039|   27|
|     C248|    2|
|     C509|  673|
|     N189|  357|
+---------+-----+
only showing top 20 rows



In [10]:
df.count()

40722508

In [13]:
from pyspark.sql.functions import expr

# df.selectExpr('add_one(v1)').show()
df.select(expr('PA_CIDPRI') == "H57").show()

+-----------------+
|(PA_CIDPRI = H57)|
+-----------------+
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
|            false|
+-----------------+
only showing top 20 rows



In [8]:
df.filter(df["PA_CIDPRI"] == "H57").show()

+-------+---------+---------+--------+--------+---------+---------+--------+---------+---------+--------------+--------------+----------+------+------+----------+--------+---------+---------+----------+-------------+---------------+---------+---------+--------+---------+---------+-------+---------+---------+---------+---------+---------+--------+--------+--------+----------+-------+----------+---------+---------+---------+---------+---------+--------+--------+----------+----------+---------+---------+---------+-------+-------+--------+--------+--------+---------+--------+------+----------+
|  index|pa_gestao|pa_condic|pa_ufmun|pa_regct|pa_incout|pa_incurg|pa_tpups|pa_tippre|pa_mn_ind|    pa_cnpjcpf|    pa_cnpjmnt|pa_cnpj_cc|pa_mvm|pa_cmp|pa_proc_id|pa_tpfin|pa_subfin|pa_nivcpl|pa_docorig|   pa_autoriz|      pa_cnsmed|pa_cbocod|pa_motsai|pa_obito|pa_encerr|pa_perman|pa_alta|pa_transf|pa_cidpri|pa_cidsec|pa_cidcas|pa_catend|pa_idade|idademin|idademax|pa_flidade|pa_sexo|pa_racacor|pa_munp

In [11]:
h57 = df.where('PA_CIDPRI like "H57%"') #.show()

In [12]:
h57.write.saveAsTable("sample_h57")

In [18]:
sqlContext.sql("show tables").show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|sample_h57|      false|
|        |    tablea|       true|
+--------+----------+-----------+



In [15]:
# Properties to connect to the database,
# the JDBC driver is part of our build.sbt
dbConnectionUrl = "jdbc:postgresql://localhost:5432/datasus"
mode = "overwrite"
table = "h57"
properties = {"user": "postgres","password": "postgres","driver": "org.postgresql.Driver"}
# Write in a table called ch02
h57.write.jdbc(url=dbConnectionUrl, table=table, mode=mode, properties=properties)

In [16]:
sqlContext.sql("select count(1) from sample_h57").show()

+--------+
|count(1)|
+--------+
|   61793|
+--------+



In [17]:
spark.sql("SHOW TABLES")

DataFrame[database: string, tableName: string, isTemporary: boolean]

In [18]:
# df.select("*").toPandas()

# End

In [15]:
spark.stop()

In [None]:
# format2018df = load_data_using_2018_format(spark,"2018*.csv")

# format2015df = load_data_using_2015_format(spark,"2015*.csv")

# format2006df = load_data_using_2006_format(spark,"200*.csv")

# format2006df2 = load_data_using_2006_format(spark,"2012*.csv")

# df = format2018df.unionByName(format2015df).unionByName(format2006df).unionByName(format2006df2)
