# 1- Apresentação notebook

O notebook no Databricks funciona como um Notebook Jupyter, mas com algumas funcionalidades adicionais, para Big Data.<br>
Primeiro vamos explorar ele um pouco.

Ele possibilita comandos direto no file system, com o comando $fs.

In [0]:
%fs ls

path,name,size,modificationTime
dbfs:/FileStore/,FileStore/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/user/,user/,0,0


In [0]:
%fs ls dbfs:/databricks-datasets/

path,name,size,modificationTime
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0


O notebook já vem configurado e você pode utilizar uma das seguintes linguagens:
- Python
- Scala
- SQL
- R

In [0]:
%fs ls dbfs:/databricks-datasets/COVID/coronavirusdataset/

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/coronavirusdataset/.DS_Store,.DS_Store,6148,1594102716000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Case.csv,Case.csv,11711,1595191979000
dbfs:/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv,PatientInfo.csv,488859,1595191979000
dbfs:/databricks-datasets/COVID/coronavirusdataset/PatientRoute.csv,PatientRoute.csv,718510,1594102718000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Policy.csv,Policy.csv,5713,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Region.csv,Region.csv,19082,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/SearchTrend.csv,SearchTrend.csv,71722,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/SeoulFloating.csv,SeoulFloating.csv,49682281,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Time.csv,Time.csv,6604,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/TimeAge.csv,TimeAge.csv,27114,1595191981000


In [0]:
%sql
DROP TABLE IF EXISTS cases;

CREATE TABLE cases
USING csv
OPTIONS (path "/databricks-datasets/COVID/coronavirusdataset/Case.csv", header "true")

In [0]:
%sql
SELECT * FROM cases

case_id,province,city,group,infection_case,confirmed,latitude,longitude
1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374
1000006,Seoul,Guro-gu,True,Manmin Central Church,41,37.481059,126.894343
1000007,Seoul,from other city,True,SMR Newly Planted Churches Group,36,-,-
1000008,Seoul,Dongdaemun-gu,True,Dongan Church,17,37.592888,127.056766
1000009,Seoul,from other city,True,Coupang Logistics Center,25,-,-
1000010,Seoul,Gwanak-gu,True,Wangsung Church,30,37.481735,126.930121


In [0]:
%sql
SELECT COUNT(*) FROM cases;

count(1)
174


# 2- PySpark

## 2.1- SparkContext

O PySpark já está pré-instalado no ambiente do Databricks.<br>
O primeiro passo é criar a conexão a um cluster master. Isso é feito criando-se um objeto da classe SparkContext.<br>

sc = SparkContext.getOrCreate()

Como o Databricks já vem pré-instalado e configurado, não precisamos fazer isso, mas podemos visualizá-lo.

In [0]:
# Em seu computador ou outro ambiente, deve rodar os comandos (após instalá-los no sistema):
'''
import findspark
findspark.init()

from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
'''

In [0]:
sc

## 2.2- SparkSession

O passo seguinte é estabelecer uma interface ao cluster, usando o SparkSession.<br>
Este também já é criado automaticamente, mas podemos criar um, em separado, sempre que quisermos.

In [0]:
"""
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.enableHiveSupport().getOrCreate()
"""

In [0]:
# SparkSession criado pelo Databricks
spark

In [0]:
# Mostra as tabelas armazenadas
spark.catalog.listTables()

Out[3]: [Table(name='cases', database='default', description=None, tableType='EXTERNAL', isTemporary=False)]

## 2.3- Leitura dos dados:

Para fazer o upload de dados do seu PC, vá em Create - Table e na aba Upload File, envie o arquivo.<br>
Após fazer o upload do arquivo, ele estará no sistema e você poderá importar pelo Spark.

### 2.3.1- Spark DataFrame

O Spark possibilita trabalhar com os dados de duas maneiras. A primeira é através do Spark DataFrame, uma estrutura local, semelhante ao DF do pandas. Iremos explorar um pouco ele agora.

In [0]:
# Arquivos upados estarão armazenados no FileStore
%fs ls /FileStore/tables/

path,name,size,modificationTime
dbfs:/FileStore/tables/AllBroadcasts.csv,AllBroadcasts.csv,22178362,1655660919000
dbfs:/FileStore/tables/country_vaccinations.csv,country_vaccinations.csv,6801916,1627998693000


In [0]:
# Leitura de dados de Covid (no dataset do databricks)
df = spark.read.csv("/databricks-datasets/COVID/coronavirusdataset/Case.csv", header="true", inferSchema="true")

In [0]:
df

Out[5]: DataFrame[UserId: string, UUID: bigint, Extras: string, Action: string, timestamp: string]

In [0]:
df.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

In [0]:
# Schema do df em formato de árvore
df.printSchema()

root
 |--  case_id: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- group: boolean (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



### 2.3.2- Tabelas no RDD

A segunda forma de lidar com dados é utilizando o seu sistema de armazenamento em tabelas no seu catálogo (Catalog).<br>
Você interage com essas tabelas usando comandos SQL, semelhante ao que o Hive fazia.<br>
As tabelas/views no catálogo são a única forma de compartilhar dados com outros clusters.<br>
Os views temporários são apagados quando o cluster é desligado (algo como uma tabela temporária).

In [0]:
# Criar uma view temporária SQL a partir do DataFrame
df.createOrReplaceTempView("temp_view")

In [0]:
# Mostra as views no RDD
spark.catalog.listTables()

Out[12]: [Table(name='cases', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='temp_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [0]:
# O Spark possibilita comandos SQL da view do RDD
sqlDF = spark.sql("SELECT * FROM temp_view LIMIT 5")
sqlDF.show()

+--------+--------+------------+-----+--------------------+---------+---------+----------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|   Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
+--------+--------+------------+-----+--------------------+---------+---------+----------+



In [0]:
# Criar uma DataFrame a partir de uma view SQL
spark_df = spark.table("temp_view")
spark_df.show()

+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| case_id|province|           city|group|      infection_case|confirmed| latitude| longitude|
+--------+--------+---------------+-----+--------------------+---------+---------+----------+
| 1000001|   Seoul|     Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|
| 1000002|   Seoul|      Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|
| 1000003|   Seoul|        Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|
| 1000004|   Seoul|   Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|
| 1000005|   Seoul|      Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|
| 1000006|   Seoul|        Guro-gu| true|Manmin Central Ch...|       41|37.481059|126.894343|
| 1000007|   Seoul|from other city| true|SMR Newly Planted...|       36|        -|         -|
| 1000008|   Seoul|  Dongdaemun-gu| true|       Dongan Churc

In [0]:
# Convertendo tabela SQL em Pandas DataFrame
query = "SELECT * FROM temp_view"
pandas_df = spark.sql(query).toPandas()
pandas_df.head()

Unnamed: 0,case_id,province,city,group,infection_case,confirmed,latitude,longitude
0,1000001,Seoul,Yongsan-gu,True,Itaewon Clubs,139,37.538621,126.992652
1,1000002,Seoul,Gwanak-gu,True,Richway,119,37.48208,126.901384
2,1000003,Seoul,Guro-gu,True,Guro-gu Call Center,95,37.508163,126.884387
3,1000004,Seoul,Yangcheon-gu,True,Yangcheon Table Tennis Club,43,37.546061,126.874209
4,1000005,Seoul,Dobong-gu,True,Day Care Center,43,37.679422,127.044374


In [0]:
type(pandas_df)

Out[16]: pandas.core.frame.DataFrame

In [0]:
type(spark_df)

Out[17]: pyspark.sql.dataframe.DataFrame

In [0]:
# Convertendo Pandas DataFrame para Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
type(spark_df)

Out[18]: pyspark.sql.dataframe.DataFrame

## 2.4- Manipulando os dados

Vamos ver agora como manipular nossos dados, construir tabelas, dar join, etc.<br>
Spark DataFrames são imutáveis. Isso significa que sempre que uma operação é realizada, como inserção ou modificação de colunas, um novo DataFrame é criado.

Para manipular colunas, usamos o comando df.withColumn("col_nova", valor_col_nova)

In [0]:
df = df.withColumn("confirmed_by_100", df.confirmed/10)
df.show(5)

+--------+--------+------------+-----+--------------------+---------+---------+----------+----------------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|confirmed_by_100|
+--------+--------+------------+-----+--------------------+---------+---------+----------+----------------+
| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|            13.9|
| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|            11.9|
| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|             9.5|
| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|             4.3|
| 1000005|   Seoul|   Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|             4.3|
+--------+--------+------------+-----+--------------------+---------+---------+----------+----------------+
only showing top 5 rows



In [0]:
# O comando df.withColumn() também pode ser usado para atualizar/sobrescrever uma coluna.
df = df.withColumn("confirmed_by_100", df.confirmed_by_100/10)
df.show(5)

+--------+--------+------------+-----+--------------------+---------+---------+----------+------------------+
| case_id|province|        city|group|      infection_case|confirmed| latitude| longitude|  confirmed_by_100|
+--------+--------+------------+-----+--------------------+---------+---------+----------+------------------+
| 1000001|   Seoul|  Yongsan-gu| true|       Itaewon Clubs|      139|37.538621|126.992652|1.3900000000000001|
| 1000002|   Seoul|   Gwanak-gu| true|             Richway|      119| 37.48208|126.901384|              1.19|
| 1000003|   Seoul|     Guro-gu| true| Guro-gu Call Center|       95|37.508163|126.884387|              0.95|
| 1000004|   Seoul|Yangcheon-gu| true|Yangcheon Table T...|       43|37.546061|126.874209|              0.43|
| 1000005|   Seoul|   Dobong-gu| true|     Day Care Center|       43|37.679422|127.044374|              0.43|
+--------+--------+------------+-----+--------------------+---------+---------+----------+------------------+
only showi

### 2.4.1- Comandos básicos Spark DataFrame

In [0]:
df.select("city")

Out[22]: DataFrame[city: string]

In [0]:
# Mostrar uma coluna
df.select("city").show(10)

+---------------+
|           city|
+---------------+
|     Yongsan-gu|
|      Gwanak-gu|
|        Guro-gu|
|   Yangcheon-gu|
|      Dobong-gu|
|        Guro-gu|
|from other city|
|  Dongdaemun-gu|
|from other city|
|      Gwanak-gu|
+---------------+
only showing top 10 rows



In [0]:
# Mostrar várias colunas com operação
df.select(df["city"], df["confirmed"]/100).show(10)

+---------------+-----------------+
|           city|(confirmed / 100)|
+---------------+-----------------+
|     Yongsan-gu|             1.39|
|      Gwanak-gu|             1.19|
|        Guro-gu|             0.95|
|   Yangcheon-gu|             0.43|
|      Dobong-gu|             0.43|
|        Guro-gu|             0.41|
|from other city|             0.36|
|  Dongdaemun-gu|             0.17|
|from other city|             0.25|
|      Gwanak-gu|              0.3|
+---------------+-----------------+
only showing top 10 rows



In [0]:
# Filtrar com condição
# Aceita sintaxe df['col'], ou df.col, ou expressão SQL entre ' '/" "

df.filter(df['confirmed'] > 400).show()
df.filter(df.confirmed > 400).show()
# Ou
df.filter('confirmed > 400').show()

+--------+----------------+---------------+-----+--------------------+---------+--------+---------+-----------------+
| case_id|        province|           city|group|      infection_case|confirmed|latitude|longitude| confirmed_by_100|
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+-----------------+
| 1200001|           Daegu|         Nam-gu| true|  Shincheonji Church|     4511|35.84008| 128.5667|            45.11|
| 1200009|           Daegu|              -|false|contact with patient|      917|       -|        -|             9.17|
| 1200010|           Daegu|              -|false|                 etc|      747|       -|        -|7.470000000000001|
| 6000001|Gyeongsangbuk-do|from other city| true|  Shincheonji Church|      566|       -|        -|             5.66|
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+-----------------+

+--------+----------------+---------------+-----+------

In [0]:
from pyspark.sql.functions import desc

# Agrupar por coluna e ordenar
df.groupBy("city").count().sort(desc("count")).show(10)

+---------------+-----+
|           city|count|
+---------------+-----+
|              -|   53|
|from other city|   51|
|         Seo-gu|    5|
|     Gangnam-gu|    4|
|   Gyeongsan-si|    3|
|        Guro-gu|    3|
|        Jung-gu|    3|
|    Seongnam-si|    3|
|   Yangcheon-gu|    2|
|   Dalseong-gun|    2|
+---------------+-----+
only showing top 10 rows



In [0]:
# Valores min, max, avg
min_cases = df.groupBy("city").min("confirmed")
max_cases = df.groupBy("city").max("confirmed")
avg_cases = df.groupBy("city").avg("confirmed")

min_cases.show(10)
max_cases.show(10)
avg_cases.show(10)

+---------------+--------------+
|           city|min(confirmed)|
+---------------+--------------+
|     Gangnam-gu|             1|
|     Cheonan-si|           103|
|from other city|             1|
|      Anyang-si|            17|
|      Gwanak-gu|            30|
|     Yongsan-gu|           139|
|        Dong-gu|             5|
|         Sejong|             8|
|     Gangseo-gu|             0|
|       Wonju-si|             4|
+---------------+--------------+
only showing top 10 rows

+---------------+--------------+
|           city|max(confirmed)|
+---------------+--------------+
|     Gangnam-gu|             7|
|     Cheonan-si|           103|
|from other city|           566|
|      Anyang-si|            22|
|      Gwanak-gu|           119|
|     Yongsan-gu|           139|
|        Dong-gu|            39|
|         Sejong|            31|
|     Gangseo-gu|             0|
|       Wonju-si|             4|
+---------------+--------------+
only showing top 10 rows

+---------------+-------

### 2.4.2- Comandos básicos SQL Views

In [0]:
# Mostrar uma coluna
spark.sql("SELECT city from temp_view LIMIT 10").show()

+---------------+
|           city|
+---------------+
|     Yongsan-gu|
|      Gwanak-gu|
|        Guro-gu|
|   Yangcheon-gu|
|      Dobong-gu|
|        Guro-gu|
|from other city|
|  Dongdaemun-gu|
|from other city|
|      Gwanak-gu|
+---------------+



In [0]:
# Mostrar várias colunas com operação
spark.sql("SELECT city, confirmed/100 AS cases FROM temp_view LIMIT 10").show()

+---------------+-----+
|           city|cases|
+---------------+-----+
|     Yongsan-gu| 1.39|
|      Gwanak-gu| 1.19|
|        Guro-gu| 0.95|
|   Yangcheon-gu| 0.43|
|      Dobong-gu| 0.43|
|        Guro-gu| 0.41|
|from other city| 0.36|
|  Dongdaemun-gu| 0.17|
|from other city| 0.25|
|      Gwanak-gu|  0.3|
+---------------+-----+



In [0]:
# Filtrar com condição
spark.sql("SELECT * FROM temp_view WHERE confirmed > 400").show()

+--------+----------------+---------------+-----+--------------------+---------+--------+---------+
| case_id|        province|           city|group|      infection_case|confirmed|latitude|longitude|
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+
| 1200001|           Daegu|         Nam-gu| true|  Shincheonji Church|     4511|35.84008| 128.5667|
| 1200009|           Daegu|              -|false|contact with patient|      917|       -|        -|
| 1200010|           Daegu|              -|false|                 etc|      747|       -|        -|
| 6000001|Gyeongsangbuk-do|from other city| true|  Shincheonji Church|      566|       -|        -|
+--------+----------------+---------------+-----+--------------------+---------+--------+---------+



In [0]:
# Agrupar por coluna e ordenar
spark.sql("SELECT city, COUNT(*) as count FROM temp_view GROUP BY city ORDER BY count DESC LIMIT 10").show()

+---------------+-----+
|           city|count|
+---------------+-----+
|              -|   53|
|from other city|   51|
|         Seo-gu|    5|
|     Gangnam-gu|    4|
|        Jung-gu|    3|
|    Seongnam-si|    3|
|   Gyeongsan-si|    3|
|        Guro-gu|    3|
|      Gwanak-gu|    2|
|   Yangcheon-gu|    2|
+---------------+-----+



In [0]:
# Valores min, max, avg
min_cases = spark.sql("SELECT city, MIN(confirmed) FROM temp_view GROUP BY city LIMIT 10")
max_cases = spark.sql("SELECT city, MAX(confirmed) FROM temp_view GROUP BY city LIMIT 10")
avg_cases = spark.sql("SELECT city, AVG(confirmed) FROM temp_view GROUP BY city LIMIT 10")

min_cases.show()
max_cases.show()
avg_cases.show()

+---------------+--------------+
|           city|min(confirmed)|
+---------------+--------------+
|     Gangnam-gu|             1|
|     Cheonan-si|           103|
|from other city|             1|
|      Anyang-si|            17|
|      Gwanak-gu|            30|
|     Yongsan-gu|           139|
|        Dong-gu|             5|
|         Sejong|             8|
|     Gangseo-gu|             0|
|       Wonju-si|             4|
+---------------+--------------+

+---------------+--------------+
|           city|max(confirmed)|
+---------------+--------------+
|     Gangnam-gu|             7|
|     Cheonan-si|           103|
|from other city|           566|
|      Anyang-si|            22|
|      Gwanak-gu|           119|
|     Yongsan-gu|           139|
|        Dong-gu|            39|
|         Sejong|            31|
|     Gangseo-gu|             0|
|       Wonju-si|             4|
+---------------+--------------+

+---------------+------------------+
|           city|    avg(confirmed)|


### 2.4.3- Joins

Assim como em SQL, podemos realizar Joins entre tabelas.
Vamos realizar join das tabelas PatientID e PatientRoute.

In [0]:
%fs ls dbfs:/databricks-datasets/COVID/coronavirusdataset/

path,name,size,modificationTime
dbfs:/databricks-datasets/COVID/coronavirusdataset/.DS_Store,.DS_Store,6148,1594102716000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Case.csv,Case.csv,11711,1595191979000
dbfs:/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv,PatientInfo.csv,488859,1595191979000
dbfs:/databricks-datasets/COVID/coronavirusdataset/PatientRoute.csv,PatientRoute.csv,718510,1594102718000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Policy.csv,Policy.csv,5713,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Region.csv,Region.csv,19082,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/SearchTrend.csv,SearchTrend.csv,71722,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/SeoulFloating.csv,SeoulFloating.csv,49682281,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/Time.csv,Time.csv,6604,1595191981000
dbfs:/databricks-datasets/COVID/coronavirusdataset/TimeAge.csv,TimeAge.csv,27114,1595191981000


In [0]:
id_df = spark.read.csv("/databricks-datasets/COVID/coronavirusdataset/PatientInfo.csv", header="true", inferSchema="true")
id_df.show(5)

+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|patient_id|   sex|age|country|province|       city|      infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|
+----------+------+---+-------+--------+-----------+--------------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul| Gangseo-gu|     overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|
|1000000002|  male|30s|  Korea|   Seoul|Jungnang-gu|     overseas inflow|       null|            31|              null|2020-01-30 00:00:00|2020-03-02 00:00:00|         null|released|
|1000000003|  male|50s|  Korea|   Seoul|  Jongno-gu|contact with patient| 2002000001|

In [0]:
id_df = id_df.withColumnRenamed('province', 'patient_province')
id_df = id_df.withColumnRenamed('city', 'patient_city')

In [0]:
route_df = spark.read.csv("/databricks-datasets/COVID/coronavirusdataset/PatientRoute.csv", header="true", inferSchema="true")
route_df.show(5)

+----------+-------------------+-----------+------------+--------------------+--------+---------+
|patient_id|               date|   province|        city|                type|latitude|longitude|
+----------+-------------------+-----------+------------+--------------------+--------+---------+
|1000000001|2020-01-22 00:00:00|Gyeonggi-do|    Gimpo-si|             airport|37.61525| 126.7156|
|1000000001|2020-01-24 00:00:00|      Seoul|     Jung-gu|            hospital|37.56724| 127.0057|
|1000000002|2020-01-25 00:00:00|      Seoul| Seongbuk-gu|                 etc|37.59256|  127.017|
|1000000002|2020-01-26 00:00:00|      Seoul| Seongbuk-gu|               store|37.59181| 127.0168|
|1000000002|2020-01-26 00:00:00|      Seoul|Seongdong-gu|public_transporta...|37.56399| 127.0295|
+----------+-------------------+-----------+------------+--------------------+--------+---------+
only showing top 5 rows



In [0]:
patient = id_df.join(route_df, on='patient_id', how='leftouter')
patient.show(5)

+----------+----+---+-------+----------------+------------+---------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+-------------------+-----------+-----------+--------------------+--------+---------+
|patient_id| sex|age|country|patient_province|patient_city| infection_case|infected_by|contact_number|symptom_onset_date|     confirmed_date|      released_date|deceased_date|   state|               date|   province|       city|                type|latitude|longitude|
+----------+----+---+-------+----------------+------------+---------------+-----------+--------------+------------------+-------------------+-------------------+-------------+--------+-------------------+-----------+-----------+--------------------+--------+---------+
|1000000001|male|50s|  Korea|           Seoul|  Gangseo-gu|overseas inflow|       null|            75|        2020-01-22|2020-01-23 00:00:00|2020-02-05 00:00:00|         null|released|2020-01-2