In [1]:
import os
import gc
import time
import numpy as np
import pandas as pd
import sys

sys.path.append('../python/property')
sys.path.append('../python/score')
sys.path.append('../python/administrativeArea/')
import warnings

warnings.filterwarnings("ignore")

from airline import Airline
from hotel import Hotel
from luxury import Luxury
from tour import Tour
from shopping import Shopping
from resort import Resort
from gender import Gender
from age import Age
from device import Device

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from dask.multiprocessing import get
from ranking import load_data, process_url_property, process_demography, process_hardware, get_unique_user_id
from vietnamADM import AdministrativeArea

In [2]:
cluster = LocalCluster(ip="0.0.0.0")
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.3.0.79:45407  Dashboard: http://10.3.0.79:8787/status,Cluster  Workers: 8  Cores: 40  Memory: 270.38 GB


In [3]:
from_date = '2018-11-14'
end_date = '2018-11-27'
PATH = '/home/phongdk/data_user_income_targeting'
VNM_ADM_PATH = '/home/phongdk/VNM_adm'
if not os.path.exists(PATH):
    os.makedirs(PATH)

filename_demography = "demography_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_hardware = "hardware_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_location = "location_from_{}_to_{}.csv.gz".format(from_date, end_date)

filename_airline = "airline_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_luxury = "luxury_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_booking_resort = "booking_resort_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_booking_hotel = "booking_hotel_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_tour = "tour_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_shopping = "shopping_from_{}_to_{}.csv.gz".format(from_date, end_date)
filename_based_url = [filename_airline, filename_luxury, filename_booking_resort, filename_booking_hotel,
                      filename_tour, filename_shopping]
class_property_based_url = [Airline('airline'), Luxury('luxury'), Resort('resort'), Hotel('hotel'),
                            Tour('tour'), Shopping('shopping')]


assert len(filename_based_url) == len(class_property_based_url)

In [None]:
def process_location(path, filename, nrows=None):
    vn_adm = AdministrativeArea(VNM_ADM_PATH)
    df = load_data(path, filename, nrows=nrows) #pd.read_csv(os.path.join(PATH, filename_location))  # , nrows=5000)
    df_dask = dd.from_pandas(df, npartitions=32)    # convert to DASK Dataframe
    '''find address from lat lon'''
    address = df_dask.apply(lambda x: vn_adm.find_address(x['lat'], x['lon']), axis=1).compute()
    df_dask = df_dask.assign(**{'address': address})
    return df_dask

In [4]:
vn_adm = AdministrativeArea(VNM_ADM_PATH)
df = load_data(PATH, filename_location, nrows=1000) #pd.read_csv(os.path.join(PATH, filename_location))  # , nrows=5000)

Load dictionary from: ../../external_data/location/lat_lon_to_location.csv.gz
Load data from file : location_from_2018-11-14_to_2018-11-27.csv.gz


In [5]:
df.head()

Unnamed: 0,user_id,lat,lon
0,4179731125044,10.9545,108.064
1,10846855546626,10.7398,106.705
2,10846855546626,10.7405,106.694
3,10846855546626,10.7405,106.695
4,10846855546626,10.7479,106.705


In [6]:
df['address'] = df.apply(lambda x: vn_adm.find_address(x['lat'], x['lon']), axis=1)

In [7]:
df = df[['user_id', 'address']].drop_duplicates(subset=['user_id', 'address'])  
df.shape

(472, 2)

In [21]:
df_regions = df.groupby('user_id')['address'].apply(list).to_frame()
df_regions.shape

(280, 1)

In [22]:
df_regions.head()

Unnamed: 0_level_0,address
user_id,Unnamed: 1_level_1
106243222964128,[aboard]
10846855546626,[hồ chí minh]
111426187200876,"[lệ thủy, quảng bình, đồng hới, quảng bình, bố..."
111479027414108,[hà nội]
111863491531868,"[hồ chí minh, mang yang, gia lai, đà nẵng, huế..."


In [24]:
df_regions.iloc[4]['address']

['hồ chí minh', 'mang yang, gia lai', 'đà nẵng', 'huế, thừa thiên huế']

In [28]:
def is_traveller_by_checking_locations(list_places):
    THRESHOLD_TRAVELLER = 2
    score_traveller = {"traveller": 5.0,
                       "non-traveller": 0}
    provinces = list(set([place.split(",")[-1] for place in list_places]))
    return score_traveller['traveller'] if len(provinces) > THRESHOLD_TRAVELLER else score_traveller['non-traveller']

In [31]:
def get_score(list_places):
    score = {"hồ chí minh" : 5.0,
            "hà nội": 5.0,
            "others": 1}
    list_scores = [] 
    for place in list_places:
        try:
            list_scores.append(score[place])
        except:
            list_scores.append(score['others'])
    score = np.max(list_scores)
    return score

In [29]:
df_regions['traveller'] =  df_regions['address'].apply(lambda x: is_traveller_by_checking_locations(x))

In [32]:
df_regions['address_score'] = df_regions['address'].apply(lambda x: get_score(x))

In [33]:
df_regions.head(10)

Unnamed: 0_level_0,address,traveller,address_score
user_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
106243222964128,[aboard],0.0,1.0
10846855546626,[hồ chí minh],0.0,5.0
111426187200876,"[lệ thủy, quảng bình, đồng hới, quảng bình, bố...",0.0,1.0
111479027414108,[hà nội],0.0,5.0
111863491531868,"[hồ chí minh, mang yang, gia lai, đà nẵng, huế...",5.0,5.0
115617306757219,"[long xuyên, an giang, hồ chí minh]",0.0,5.0
116830224482424,"[thạnh phú, bến tre, giồng trôm, bến tre, mỏ c...",0.0,1.0
119609590444140,[hồ chí minh],0.0,5.0
123905934427628,[hồ chí minh],0.0,5.0
135176662268404,"[vũng tàu, bà rịa-vũng tàu, long điền, bà rịa-...",0.0,1.0




In [None]:
df_dask = dd.from_pandas(df, npartitions=32)    # convert to DASK Dataframe

In [None]:
df_location = process_location(PATH, filename_location, nrows=10000)

In [None]:
df_demography = process_demography(PATH, filename_demography, nrows=1000)
df_hardware = process_hardware_location(PATH, filename_hardware, nrows=1000)



list_df = [df_demography, df_hardware]
for (filename, class_property) in zip(filename_based_url, class_property_based_url):
    list_df.append(process_url_property(PATH, filename, count_name=class_property.get_name(),
                                        threshold=class_property.get_threshold()))

    

In [None]:
df_demography.head()

In [None]:
df_hardware.head()

In [None]:
def merge_data(list_df, numeric_col):
    print("-----------------------Merge data-----------------------------")
    list_dask_df = [dd.from_pandas(df, npartitions=4) for df in list_df]  # convert to Dask DF with chunksize 64MB

    for (i, df) in enumerate(list_dask_df):
        if i == 0:
            df_total = df[:]
        else:
            df_total = df_total.merge(df, left_index=True, right_index=True, how='outer')
    print(type(df_total))
    df_total = df_total.fillna(-1)
#     for col in numeric_col:
#         df_total[col] = df_total[col].map_partitions(pd.to_numeric).compute()

    return df_total

In [None]:
%%time
numeric_columns =['gender', 'age']
df_total = merge_data(list_df, numeric_columns)

In [None]:
len(df_total.columns)

In [None]:
len(df_total)

In [None]:
properties = [Gender('gender'), Age('age')] + class_property_based_url + [Device('device')]
#properties = [Gender('gender'), Age('age')] + class_property_based_url

In [None]:
print(properties)

In [None]:
# for (i, proper) in enumerate(properties):
#     col = proper.get_name()
#     print(i, col)
#     #df_dask = df_total.map_partitions(lambda x: proper.get_score(x[col]), meta=object)
#     #df_dask = df_total.map_partitions(lambda df: df.apply(lambda x: 10, axis=1)).compute()
#     #df_dask = df_total[col].map_partitions(lambda x: x.values[0], meta={col:'i8'}).compute()
#     print(proper.get_score(1))
#     print(df_total[col].unique().compute())
#     df_dask = df_total[col].apply(lambda x: properties[i].get_score(int(x))).compute()
#     name = col + "_score"
#     df_total = df_total.assign(**{name : df_dask})

In [None]:
#df_total.to_csv('/home/phongdk/tmp/*.csv')

In [None]:
def compute_score(df, properties):
    print('----------------------Compute SCORE ---------------------------')
    for (i, proper) in enumerate(properties):
        print(proper)
        col = proper.get_name()
        if (col == 'device'):
            df_dask = df[['os_name', 'hw_class', 'cpu', 'sys_ram_mb',
                                     'screen_height', 'screen_width']].apply(lambda x: proper.get_score(x), axis=1)
        else:
            df_dask = df[col].apply(lambda x: proper.get_score(x)).compute()
        df = df.assign(**{col + "_score" : df_dask})
        
    #print("Memory usage of properties dataframe is :", df.memory_usage().sum() / 1024 ** 2, " MB")
    col_score = [col for col in df.columns if 'score' in col]
    df = df[col_score]
    #df['total_score'] = df.sum(axis=1).compute()
    #df.to_csv('/home/phongdk/tmp/score2.csv.gz', compression='gzip', index=True)
    #print(df.head())
    return df

In [None]:
df_score = compute_score(df_total, properties)
#print("--- %s seconds ---" % (time.time() - start_time))


In [None]:
df_score.head()

In [None]:
df_score['total_score'] = df_score.sum(axis=1).compute()

In [None]:
df_score.head()

In [None]:
df_score['hotel_score'].unique().compute()