In [None]:
%config IPCompleter.greedy=True

In [None]:
covidjson='https://covidtracking.com/api/v1/states/current.json'
covidcsv='http://covidtracking.com/api/v1/states/current.csv'

In [None]:
# Download json files and deserialize into python object
import requests
import json

def download_json(url):
    with requests.Session() as s:
        return json.loads( s.get(url).text )

In [None]:
# Download csv file and deserialize into python object
def download_csv(url, filename):
    with requests.Session() as s:
        with open(filename, 'wb') as fd:
            fd.write( s.get(url).content )
    return filename

In [None]:
csv_current_filename = download_csv(covidcsv, 'coviddata.csv')

In [None]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

In [None]:
p = beam.Pipeline(runner=InteractiveRunner())

In [None]:
data_from_json = p | 'Data from json' >> beam.Create(download_json(covidjson))

In [None]:
from apache_beam.runners.interactive import interactive_beam as ib

In [None]:
ib.show(data_from_json)

In [None]:
%config IPCompleter.greedy=True

In [None]:
data_from_csv = p | 'Data from csv' >> beam.io.ReadFromText(csv_current_filename, skip_header_lines=True)

In [None]:
ib.show(data_from_csv)

In [None]:
# Read header from csv file. Assume header also includes comma inside column name.
from csv import reader

def read_header(csv_file):
    with open(csv_file, 'r') as fd:
        header_line = fd.readline().strip()
        
    return next(reader([header_line]))

In [None]:
headers = read_header(csv_current_filename)

headers

In [None]:
# Create DoFn that will parese each column in csv file
from collections import namedtuple

UsCovidData = namedtuple('UsCovidData', headers )

class ParseColumns(beam.DoFn):
    def __init__(self, schema):
        self._schema = schema

    # Override
    def process(self, element):
        values = [ int(val) if val.isdigit() else val for val in next(reader([element]))]

        # 1 to 1 mapping
        return [ self._schema(*values) ]

In [None]:
current_data = data_from_csv | 'ParseCsv' >> beam.ParDo(ParseColumns(UsCovidData) )

In [None]:
ib.show(current_data)

In [None]:
df = ib.collect(current_data)
df.describe()

In [None]:
ib.show(current_data, visualize_data=True)