## Exploring Dask Framework & Client

Building high performance data processing pipeline for sampling.  Code might be useful for later implementations, exploration

In [None]:
# Some important dependencies
!pip install --upgrade pip
!pip install python-decouple pandas pyarrow

### Start Dask Client for Dashboard

**Source** https://examples.dask.org/dataframe.html

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')

In [2]:
# Show client stats (dashboard)
client

0,1
Client  Scheduler: tcp://127.0.0.1:40005  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


### Download S3 Data to dask dataframe

In [13]:
###Connect to S3###
from awstools.awstools import s3

def get_bucket(bucket_name = 'yelp-data-shared-labs18'):
    return s3.Bucket(bucket_name)

In [3]:
import dask
import dask.dataframe as dd
import pandas as pd
import os

def load_json(filename, npartitions=4):
    """
    Download json file and load into dask dataframe.
    
    Parameters
    ----------
    
    """
    filepath = os.path.join(os.getcwd(), filename)
    try:
        dask_df = dd.read_json(filepath)
        return dask_df
    except:
        raise
    

In [17]:
# Manually download file
bucket = get_bucket()
filename='sample_data/tip.parquet'
bucket.get(filename, 'tip.parquet')

In [4]:
df = load_json(filename='business.json')

In [5]:
display(type(df), df.head())

dask.dataframe.core.DataFrame

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
0,1SWheh84yJXfytovILXOAQ,Arizona Biltmore Golf Club,2818 E Camino Acequia Drive,Phoenix,AZ,85016,33.522143,-112.018481,3.0,5,0,{'GoodForKids': 'False'},"Golf, Active Life",
1,QXAEGFB4oINsVuTFxEYKFQ,Emerald Chinese Restaurant,30 Eglinton Avenue W,Mississauga,ON,L5R 3E7,43.605499,-79.652289,2.5,128,1,"{'RestaurantsReservations': 'True', 'GoodForMe...","Specialty Food, Restaurants, Dim Sum, Imported...","{'Monday': '9:0-0:0', 'Tuesday': '9:0-0:0', 'W..."
2,gnKjwL_1w79qoiV3IC_xQQ,Musashi Japanese Restaurant,"10110 Johnston Rd, Ste 15",Charlotte,NC,28210,35.092564,-80.859132,4.0,170,1,"{'GoodForKids': 'True', 'NoiseLevel': 'u'avera...","Sushi Bars, Restaurants, Japanese","{'Monday': '17:30-21:30', 'Wednesday': '17:30-..."
3,xvX2CttrVhyG2z1dFg_0xw,Farmers Insurance - Paul Lorenz,"15655 W Roosevelt St, Ste 237",Goodyear,AZ,85338,33.455613,-112.395596,5.0,3,1,,"Insurance, Financial Services","{'Monday': '8:0-17:0', 'Tuesday': '8:0-17:0', ..."
4,HhyxOkGAM07SRYtlQ4wMFQ,Queen City Plumbing,"4209 Stuart Andrew Blvd, Ste F",Charlotte,NC,28217,35.190012,-80.887223,4.0,4,1,"{'BusinessAcceptsBitcoin': 'False', 'ByAppoint...","Plumbing, Shopping, Local Services, Home Servi...","{'Monday': '7:0-23:0', 'Tuesday': '7:0-23:0', ..."


In [6]:
df.tail()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
192604,nqb4kWcOwp8bFxzfvaDpZQ,Sanderson Plumbing,,North Las Vegas,NV,89032,36.213732,-115.177059,5.0,9,1,{'BusinessAcceptsCreditCards': 'True'},"Water Purification Services, Water Heater Inst...","{'Monday': '0:0-0:0', 'Tuesday': '0:0-0:0', 'W..."
192605,vY2nLU5K20Pee-FdG0br1g,Chapters,17440 Yonge Street,Newmarket,ON,L3Y 6Y9,44.052658,-79.48185,4.5,3,1,"{'RestaurantsPriceRange2': '2', 'BikeParking':...","Books, Mags, Music & Video, Shopping",
192606,MiEyUDKTjeci5TMfxVZPpg,Phoenix Pavers,21230 N 22nd St,Phoenix,AZ,85024,33.679992,-112.035569,4.5,14,1,"{'BusinessAcceptsCreditCards': 'True', 'ByAppo...","Home Services, Contractors, Landscaping, Mason...","{'Monday': '7:0-15:0', 'Tuesday': '7:0-15:0', ..."
192607,zNMupayB2jEHVDOji8sxoQ,Beasley's Barber Shop,4406 E Main St,Mesa,AZ,85205,33.416137,-111.735743,4.5,15,1,"{'RestaurantsPriceRange2': '1', 'BusinessAccep...","Beauty & Spas, Barbers","{'Tuesday': '8:30-17:30', 'Wednesday': '8:30-1..."
192608,c1f_VAX1KIK8-JoVhjbYOw,Oriental Relax,3735 S Las Vegas Blvd,Las Vegas,NV,89109,36.107267,-115.17192,4.0,3,0,"{'BikeParking': 'False', 'RestaurantsPriceRang...","Massage, Beauty & Spas","{'Monday': '10:0-0:0', 'Tuesday': '10:0-0:0', ..."


In [None]:
###Get/Transcribe File Data###
file_list = [
    ('business.json', 132),
    ('user.json', 2300),
    ('review.json', 5000),
    ('photo.json', 25),
    ('checkin.json', 390),
    ('tip.json', 234),
]


# Set max file size for output
max_file_size = 20  # MB


# Calculate fraction of dataset to capture
fraction_list = [(x[0], max_file_size/x[1]) for x in file_list]


print(fraction_list)

So far, Pandas will work as a drop in Dask.  I think staying with Dask is a good idea temporarily in the event we want to sample with computation (future feature generation via submit(apply_func, dataframe).

## Generate Sample Data

Create sampling procedure to get subset of data.  Early version will be simple random sample, but some time series analysis may be useful for future samples.

In [7]:
def sample_data(dataframe, fraction, filename=None):
    """
    Return sampled dataframe or save as parquet file.
    """
    if filename is None:
        return dataframe.sample(frac=fraction, replace=False).compute()
    dataframe.sample(frac=fraction, replace=False).compute().to_parquet(
        filename
    )

In [31]:
sdf = sample_data(df, 0.2)
sdf.head()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
127621,4B6HGu5C68dfUk5_N2lbYg,Porta Via Restaurant & Catering,"5399 Eglinton Avenue W, Suite 104",Toronto,ON,M9C 5K6,43.66303,-79.589617,4.5,38,1,"{'HasTV': 'False', 'RestaurantsAttire': 'u'cas...","Sandwiches, Salad, Event Planning & Services, ...","{'Monday': '11:0-15:0', 'Tuesday': '11:0-15:0'..."
65146,mS_Vorll2cVmux4ZNk7Apw,"North Valley Eye Care, PC","2525 W Carefree Hwy, Ste 116",Phoenix,AZ,85085,33.797942,-112.114679,5.0,34,1,"{'AcceptsInsurance': 'True', 'BusinessAcceptsC...","Health & Medical, Optometrists","{'Monday': '8:0-17:30', 'Tuesday': '8:0-17:30'..."
16932,Fa0eq4Gwl8Tuc168Sm1rpA,ISM Raceway,7602 S Avondale Blvd,Avondale,AZ,85323,33.374918,-112.311246,4.0,41,1,{'GoodForKids': 'True'},"Race Tracks, Active Life, Arts & Entertainment","{'Monday': '8:0-17:0', 'Tuesday': '8:0-17:0', ..."
97928,7TLrRPcU_fPPESzEXhrxmA,Food Basics,5559 Dundas Street W,Etobicoke,ON,M9B 1B9,43.628265,-79.550149,3.0,4,1,"{'RestaurantsTakeOut': 'False', 'Caters': 'Fal...","Grocery, Food","{'Monday': '8:0-22:0', 'Tuesday': '8:0-22:0', ..."
4032,PXljOYf3NJpIhJ9dQuKFhg,Irpinia Kitchen Sales,"1280 Castlefield Avenue, 278 Newkirk Road",Richmond Hill,ON,L4C 3G7,43.699411,-79.459226,2.5,3,0,,"Kitchen & Bath, Shopping, Home & Garden",


In [32]:
# business.json attribute structure will not save to parquet natively.  Need to flatten/expand
# sdf.to_parquet('sample_business.parquet')
sdf.to_json('sample_business.json', orient='records')

**daskdataframe.sample()** returns a pandas DataFrame!

In [None]:
sample_data(df, 0.2, 'sample_business.json')

### Read from parquet or JSON (into pandas, now that we have a smaller file)

In [4]:
import pandas as pd
import os

filename = 'sample_users.parquet'
filepath = os.path.join(os.getcwd(), 'db_api', filename)

sample_data = pd.read_parquet(filepath)
# sample_data = pd.read_json(filepath)
sample_data.head()

Unnamed: 0,user_id,name,review_count,yelping_since,useful,funny,cool,elite,friends,fans,...,compliment_more,compliment_profile,compliment_cute,compliment_list,compliment_note,compliment_plain,compliment_cool,compliment_funny,compliment_writer,compliment_photos
1531760,ULGM_WSrSHk7SPfOngN6jA,Allyn,33,2014-04-16 14:07:29,46,8,3,,"lYp818T-xh8Ss79To5Azhw, OpDDS05v_O3HSnIxCFN2Wg",1,...,0,0,0,0,4,0,0,0,1,0
1125613,YHSvgBH1kdiOzreTr7RJAA,Allam Dinesh,1,2015-05-03 22:43:58,3,1,1,,,0,...,0,0,0,0,0,0,0,0,0,0
894988,aZqHLzOGv6G20Fvw8l7NXw,J,30,2010-02-05 21:26:53,34,5,9,,"3z7E1Gg001_1YdZZYDnO2A, HF5tvVj7cWWwth2CRnkKcA",0,...,0,0,0,0,1,0,1,1,0,1
560230,E-_4sIgh4YNne_q4Mj5HBw,Shawn,1,2010-09-04 18:48:46,0,0,0,,jUbRgQeWSm4G4tjLB-59hQ,0,...,0,0,0,0,0,0,0,0,0,0
843815,qn4Zc-MaITdhzaEq-Zl6oA,Jennifer,30,2010-07-15 00:51:43,24,5,10,,"gn5fwkEcKR4uF9wwVr2fCA, 0yfhwU4AhU7ev90sNzVrww...",2,...,0,0,0,0,0,1,0,0,0,0


In [35]:
# Save to bucket
from awstools.awstools import s3

bucket = s3.Bucket(bucket_name='yelp-data-shared-labs18')

In [37]:
local_filename = 'sample_business.json'
save_name = 'sample_data/sample_business.json'

bucket.save(file_name=local_filename, object_name=save_name)

sample_business.json  27626611 / 27626611.0  (100.00%)

### List Bucket Contents

Emulate directory structure from s3 flat storage. 

In [None]:
for key in s3.get_bucket_keys(bucket_name = 'yelp-data-shared-labs18'):
    print(key)

In [None]:
bucket = s3.Bucket(bucket_name='yelp-data-shared-labs18')
bucket.contents

In [None]:
bucket.dir(all=False)

In [None]:
for parq in bucket.find(suffix="parquet"):
    print(parq)

### DateTime Conversion

DateTime fields must be regenerated from JSON input as that information does not carry.

Tips, Reviews, Users, and Checkins all have a datetime stamp

In [12]:
from datetime import datetime

time_str = '2014-04-16 poop14:07:29'

try:
    datetime_object = datetime.fromisoformat(time_str)
except ValueError:
    print('Value Error')
    print(datetime.fromisoformat('1969-01-01'))
    raise
print(datetime_object)

Value Error
1969-01-01 00:00:00


ValueError: Invalid isoformat string: '2014-04-16 poop14:07:29'

### Splitting Query Data

Write validation is taking too long on large POST requests.  We can probably use more sessions per application worker to get near linear scaling if we can break the record list up effectively.

In [24]:
test_data = {
    'table_name': 'newtable',
    'data': ['record '+ str(x) for x in range(102)]
}

In [25]:
# Splitting _data up
num_splits = 20  # 20 may be too high a number in practice due to limitations in number of allowed connections/sessions.
query = test_data

def build_databunch(query, num_splits=3):
    databunch = []
    bunch_size = int(len(query['data']) / num_splits)
    for i in range(num_splits):
        if i < num_splits-1:
            data_range = (i*bunch_size, (i+1)*bunch_size)
        else:
            data_range = (i*bunch_size, len(query['data']))
        databunch.append(
            {
                'table_name': query['table_name'],
                'data': query['data'][data_range[0]:data_range[1]]
            }
        )
    return databunch
            

In [26]:
databunch = build_databunch(query=query, num_splits=num_splits)
databunch[0]

{'table_name': 'newtable',
 'data': ['record 0', 'record 1', 'record 2', 'record 3', 'record 4']}

In [27]:
# Now map query creation and execution to the databunch

from multiprocessing import Pool

class Query():  # Example class
    def __init__(self, query):
        self.len = len(query['data'])
    
    def execute(self):
        print(self.len)

# Process function to map onto databunch
def run_query(query):
    query_session = Query(query)
    query_session.execute()

    
p = Pool(len(databunch))
p.map(run_query, databunch)



5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
7


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]