<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#rewritten-version-of-the-SQL-pipeline" data-toc-modified-id="rewritten-version-of-the-SQL-pipeline-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>rewritten version of the SQL pipeline</a></span></li></ul></div>

## rewritten version of the SQL pipeline
reading H5s from the S3 bucket in EMR+pyspark was giving me a headache, so i decided to just preprocess the h5s into easy CSVs and sync them this way. so i'll have... 39k CSV files, which can be read more easily in EMR notebook, and concatenated into one big RDD.

this takes about 2 hours to save locally, probably another half hour to upload to S3. if working with bigger data, i would ideally find a way to wrangle the .h5s directly in spark.

In [2]:
import pandas as pd
import numpy as np
import pandas as pd
import mysql.connector
from tqdm import tqdm
import h5py
from sqlalchemy import types, create_engine 
import os
from os import listdir
from os.path import isfile, join
import config
import time

In [18]:
def get_files_list(): # gets list of filepath strings in ~/data
    mypath = os.getcwd()+'/data/'
    files = [f for f in listdir(mypath) if isfile(join(mypath, f))]
    for f in files:
        f = mypath+f # append full path for h5py loading
    files = list(map(lambda f: mypath+f, files))
    return files 

def process_orbit(h5): # filename string. adds h5's observations to dataframe
    # the formatting/array shape is all uniform. thanks, NASA
    f = h5py.File(h5, 'r') # read file
    geo = f['GEOLOCATION_DATA'] # h5 Groups architecture is similar to dict
    sci = f['SCIENCE_DATA']
    # hdf.Datasets -> np.array -> flatten -> list. faster than looping through each matrix
    lat = list(geo['Latitude'].value.ravel())
    long = list(geo['Longitude'].value.ravel()) # is there a less verbose way to do this?
    sat_lat = list(geo['SpacecraftLatitude'].value.ravel())*36 # extend the 1d arrays
    sat_long = list(geo['SpacecraftLongitude'].value.ravel())*36 # don't forget to ravel like i did
    sat_alt = list(geo['SpacecraftAltitude'].value.ravel())*36
    time = list(geo['TimeUTC'].value.ravel())*36 # 36 measurements per position means one "time" value for every consecutive 36 measurements
    sza = list(geo['SolarZenithAngle'].value.ravel())
    pbl = list(sci['ColumnAmountSO2_PBL'].value.ravel())
    anom = list(sci['Flag_SAA'].value.ravel())
    volc = list(sci['Flag_SO2'].value.ravel())
    # combine lists into df
    new = pd.DataFrame(list(zip(lat, long, sat_lat, sat_long, sat_alt, time, sza, pbl, anom, volc)),
                       columns=["lat", "long", "sat_lat", "sat_long", 
                                'sat_alt', "time", "sza", "pbl", "anom", 'volc'])
    new['time'] = new['time'].astype(str) # change time format
    new['time'] = new['time'].apply(lambda st: st[2:12]+' '+st[13:21]) # change time format
    return new # returns new df

def make_engine():
    user = config.user # substitute your own username, password & SQL server host
    pw = config.pw
    host = config.host
    db = config.db
    connst = f'mysql+pymysql://{user}:{pw}@{host}/{db}'
    engine = create_engine(connst, echo=False) # don't set pool_recycle
    return engine

# rewrote this method to save to local CSVs instead of pushing to SQL
# since we're just gonna "aws s3 sync" our CSVs to the S3 bucket for spark processing
def process_h5s(files, engine): # filepath strings
    # to keep track of progress
    files = get_files_list() # get filepath strings
#     engine = 
    for c,f in enumerate(tqdm(files)): # this took 18 hours for 40,000 11k-length h5 files on my laptop
        df = process_orbit(f)
        df.to_csv(str(c)+'.csv')
#         df.to_sql('so2', con=engine, if_exists='append') # sqlalchemy takes care of sessions, commits etc rather nicely
    return 

def setup(): # get engine and filestring list
    print('establishing engine...')
    engine = make_engine() # establish connection to server
    files = get_files_list() # get filepath strings
    return engine, files

In [19]:

# get engine & list of filepathstrings in /data
engine, files = setup()
print(f'processing {len(files)} files...')
# modified process method, save to /csv_data
process_h5s(files, engine) # process each file & upload to server

establishing engine...
processing 39629 files...




  0%|          | 0/39629 [00:00<?, ?it/s][A[A

  0%|          | 1/39629 [00:00<2:08:20,  5.15it/s][A[A

  0%|          | 2/39629 [00:00<2:06:19,  5.23it/s][A[A

  0%|          | 3/39629 [00:00<2:04:47,  5.29it/s][A[A

  0%|          | 4/39629 [00:00<2:10:54,  5.04it/s][A[A

  0%|          | 5/39629 [00:00<2:11:49,  5.01it/s][A[A

  0%|          | 6/39629 [00:01<2:13:55,  4.93it/s][A[A

  0%|          | 7/39629 [00:01<2:11:52,  5.01it/s][A[A

  0%|          | 8/39629 [00:01<2:12:50,  4.97it/s][A[A

  0%|          | 9/39629 [00:01<2:16:44,  4.83it/s][A[A

  0%|          | 10/39629 [00:02<2:17:56,  4.79it/s][A[A

  0%|          | 11/39629 [00:02<2:21:32,  4.66it/s][A[A

  0%|          | 12/39629 [00:02<2:34:21,  4.28it/s][A[A

  0%|          | 13/39629 [00:02<2:31:24,  4.36it/s][A[A

  0%|          | 14/39629 [00:02<2:24:44,  4.56it/s][A[A

  0%|          | 15/39629 [00:03<2:22:56,  4.62it/s][A[A

  0%|          | 16/39629 [00:03<2:21:57,  4.65it/s][A[A

  0%|          | 136/39629 [00:28<2:21:44,  4.64it/s][A[A

  0%|          | 137/39629 [00:28<2:21:47,  4.64it/s][A[A

  0%|          | 138/39629 [00:29<2:21:53,  4.64it/s][A[A

  0%|          | 139/39629 [00:29<2:21:48,  4.64it/s][A[A

  0%|          | 140/39629 [00:29<2:21:45,  4.64it/s][A[A

  0%|          | 141/39629 [00:29<2:20:18,  4.69it/s][A[A

  0%|          | 142/39629 [00:30<2:20:26,  4.69it/s][A[A

  0%|          | 143/39629 [00:30<2:21:19,  4.66it/s][A[A

  0%|          | 144/39629 [00:30<2:21:47,  4.64it/s][A[A

  0%|          | 145/39629 [00:30<2:21:39,  4.65it/s][A[A

  0%|          | 146/39629 [00:30<2:20:50,  4.67it/s][A[A

  0%|          | 147/39629 [00:31<2:15:38,  4.85it/s][A[A

  0%|          | 148/39629 [00:31<2:16:51,  4.81it/s][A[A

  0%|          | 149/39629 [00:31<2:30:58,  4.36it/s][A[A

  0%|          | 150/39629 [00:31<2:28:25,  4.43it/s][A[A

  0%|          | 151/39629 [00:31<2:26:55,  4.48it/s][A[A

  0%|          | 152/396

  1%|          | 271/39629 [00:57<2:19:28,  4.70it/s][A[A

  1%|          | 272/39629 [00:57<2:20:28,  4.67it/s][A[A

  1%|          | 273/39629 [00:57<2:21:07,  4.65it/s][A[A

  1%|          | 274/39629 [00:58<2:21:54,  4.62it/s][A[A

  1%|          | 275/39629 [00:58<2:21:59,  4.62it/s][A[A

  1%|          | 276/39629 [00:58<2:04:08,  5.28it/s][A[A

  1%|          | 277/39629 [00:58<2:09:05,  5.08it/s][A[A

  1%|          | 278/39629 [00:58<2:12:35,  4.95it/s][A[A

  1%|          | 279/39629 [00:59<2:16:00,  4.82it/s][A[A

  1%|          | 280/39629 [00:59<2:18:06,  4.75it/s][A[A

  1%|          | 281/39629 [00:59<2:18:53,  4.72it/s][A[A

  1%|          | 282/39629 [00:59<2:19:04,  4.72it/s][A[A

  1%|          | 283/39629 [01:00<2:15:24,  4.84it/s][A[A

  1%|          | 284/39629 [01:00<2:15:20,  4.85it/s][A[A

  1%|          | 285/39629 [01:00<2:17:02,  4.78it/s][A[A

  1%|          | 286/39629 [01:00<2:18:45,  4.73it/s][A[A

  1%|          | 287/396

OSError: Unable to open file (file signature not found)

In [17]:
df = pd.DataFrame({'name': ['Raphael', 'Donatello'],
                   'mask': ['red', 'purple'],
                   'weapon': ['sai', 'bo staff']})
df.to_csv('turtles.csv')