# Initialize Environment

### Compatibility notes

The following code is only compatible with a python 3.4 kernel and the notebook must be opened with ipython 3.0+.

In [None]:
from pdb import set_trace as debug
from pandas.io.parsers import read_csv
import pandas
from IPython.parallel import Client,require
from collections import Counter

In [None]:
import datetime
import re
import csv
import gzip
import ujson

In [None]:
#--> RAW_FILE_DIR with the directory that contains google_play__main.json.gz
#--> OUT_DIR with the directory of where you want the processed files to go
#--> DEBUG whether to print out created CSV files or not
#--> COMPRESS_LEVEL gzip compression level. warning: severely impacts runtime.
#--> LINE_LIMIT how many lines to iterate through in the raw file. For debugging.
DUMP_DATE = '2015_02_26_23_12'
RAW_FILE_DIR = '/home/cgaray/data'
OUT_DIR = '/home/cgaray/data/out'
DEBUG = 1
COMPRESS_LEVEL = 9
LINE_LIMIT = 1000
PARALLEL = 1
PIGZ = 1
OBS_SKIP = 100
OBS_PRINT = 5

In [None]:
# Don't change these!
RAW_FILE = '{}/amazon__main.json.gz'.format(RAW_FILE_DIR)
OUT_BASE = OUT_DIR+'/{}__amazon__main__'+DUMP_DATE+'.csv.gz'

In [None]:
if PIGZ:
    @require(gzip)
    def iter_json_gzip(filename,LINE_LIMIT=LINE_LIMIT):
        return gzip.iter_json_gzip(filename,LINE_LIMIT=LINE_LIMIT)    
else:
    @require(gzip,ujson)
    def iter_json_gzip(filename,LINE_LIMIT=LINE_LIMIT):
        with gzip.open(filename,'rt') as file_iter:
            for c,line in enumerate(file_iter):
                if c > LINE_LIMIT and LINE_LIMIT>0:
                    break
                if isinstance(line,str):
                    if len(line)>0:        
                        out = ujson.loads(line)
                        if isinstance(out,dict):
                            if 'app_id' in out and 'timestamp' in out:
                                yield out

# Print Sample Observations

In [None]:
try:
    from sh import zcat,head,tail
    print(tail(head(zcat(RAW_FILE_NEWDATA, _piped =True),
                    "-n{}".format(OBS_SKIP)
                   ),"-n{}".format(OBS_PRINT)))
except:
    pass

# Initialize Parallel Computing

In [None]:
# Configured for: equity
if PARALLEL:
    ipython_parallel = Client()
    print("{} active computing engines".format(len(ipython_parallel.ids)))

    lbv = ipython_parallel.load_balanced_view()

    map = lambda f,itertable:lbv.map(f,itertable,\
    block =False,\
    ordered =False)

    @require('socket')
    def host(dummy):
        return socket.gethostname()

    nodes = list(lbv.map(host,ipython_parallel.ids))
    nodes = [int(x.split('equity')[1].split(".")[0]) for x in nodes]
    nodes = nodes

    print(sorted(list(Counter(nodes).items())))

# Run them all!

In [None]:
print("starting processing Amazon Data Request")

In [None]:
specific_tables = """
""".split()

all_tables = [x for x in globals() 
              if callable(globals()[x]) 
              and x.isupper()]
all_tables

In [None]:
def run_function(f,RAW_FILE=RAW_FILE,OUT_BASE=OUT_BASE,COMPRESS_LEVEL=COMPRESS_LEVEL):
    return f(RAW_FILE ,OUT_BASE,COMPRESS_LEVEL)

for i,file in enumerate(map(run_function,[globals()[x] for x in all_tables])):
    print(i+1,file)

In [None]:
print("DONE")