In [1]:
import os
import json
import time
from datetime import datetime
import requests

import pyspark

In [2]:
spark = pyspark.sql.SparkSession.builder.master('local[16]').appName('covid19').config('spark.driver.memory', '8g').getOrCreate()

In [3]:
# assuming cwd is repo root.
# with open('data/confirmed_mar27_2020.csv', 'rb') as fh:
with open('data/confirmed_mar27_2020.csv', 'r') as fh:
    confirmed_fc = fh.read()

In [4]:
print(f'file len: {len(confirmed_fc)}')
rows = confirmed_fc.split('\n')
print(f'rows: {len(rows)}')

file len: 48844
rows: 251


In [5]:
rows[-2]

',Burma,21.9162,95.956,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,8'

In [6]:
rows[-1]

''

In [7]:
# drop the last line, just empty row.
del rows[250]

In [8]:
conf_rdd = spark.sparkContext.parallelize(rows)

In [9]:
# conf_rdd.collect()
# conf_rdd.count()

In [10]:
conf_rdd_idx = conf_rdd.zipWithIndex().map(lambda rec: (rec[1], rec[0]))

In [11]:
conf_rdd_idx.take(3)

[(0,
  'Province/State,Country/Region,Lat,Long,1/22/20,1/23/20,1/24/20,1/25/20,1/26/20,1/27/20,1/28/20,1/29/20,1/30/20,1/31/20,2/1/20,2/2/20,2/3/20,2/4/20,2/5/20,2/6/20,2/7/20,2/8/20,2/9/20,2/10/20,2/11/20,2/12/20,2/13/20,2/14/20,2/15/20,2/16/20,2/17/20,2/18/20,2/19/20,2/20/20,2/21/20,2/22/20,2/23/20,2/24/20,2/25/20,2/26/20,2/27/20,2/28/20,2/29/20,3/1/20,3/2/20,3/3/20,3/4/20,3/5/20,3/6/20,3/7/20,3/8/20,3/9/20,3/10/20,3/11/20,3/12/20,3/13/20,3/14/20,3/15/20,3/16/20,3/17/20,3/18/20,3/19/20,3/20/20,3/21/20,3/22/20,3/23/20,3/24/20,3/25/20,3/26/20,3/27/20'),
 (1,
  ',Afghanistan,33.0,65.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,1,1,1,1,1,1,1,4,4,5,7,7,7,11,16,21,22,22,22,24,24,40,40,74,84,94,110'),
 (2,
  ',Albania,41.1533,20.1683,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,10,12,23,33,38,42,51,55,59,64,70,76,89,104,123,146,174,186')]

In [12]:
# * is the splat operator here. similar to spread in js
conf_rdd_splat = conf_rdd_idx.map(lambda rec: (rec[0], *(rec[1].split(',')) ))

In [13]:
# conf_rdd_splat.take(3)

In [14]:
# dataframe is just an rdd + schema, here i am not really specifying much schema
conf_df = conf_rdd_splat.toDF()

In [15]:
# this will look bad if word wrap is enabled.
# conf_df.show(3)

In [16]:
# take first 3 columns and the last 8 columns
conf_df.select(conf_df.columns[:3] + conf_df.columns[-8:]).show(20)

+---+--------------------+-------------------+-------+-------+-------+-------+-------+-------+-------+-------+
| _1|                  _2|                 _3|    _64|    _65|    _66|    _67|    _68|    _69|    _70|    _71|
+---+--------------------+-------------------+-------+-------+-------+-------+-------+-------+-------+-------+
|  0|      Province/State|     Country/Region|3/20/20|3/21/20|3/22/20|3/23/20|3/24/20|3/25/20|3/26/20|3/27/20|
|  1|                    |        Afghanistan|     24|     24|     40|     40|     74|     84|     94|    110|
|  2|                    |            Albania|     70|     76|     89|    104|    123|    146|    174|    186|
|  3|                    |            Algeria|     90|    139|    201|    230|    264|    302|    367|    409|
|  4|                    |            Andorra|     75|     88|    113|    133|    164|    188|    224|    267|
|  5|                    |             Angola|      1|      2|      2|      3|      3|      3|      4|      4|
|