In [1]:
from google.cloud import bigquery, storage
from datetime import datetime, timedelta
import pathlib
from sklearn.model_selection import train_test_split
import os
import shutil
from random import randint
import pandas as pd
from google.cloud.exceptions import NotFound

In [2]:
client = bigquery.Client(project="sharechat-production")



In [3]:
LANGS = [
    "Hindi",
    # "Tamil",
    # "Telugu",
#     "Kannada",
#     "Punjabi",
#     "Odia",
#     "Bengali",
#     "Marathi",
#     "Malayalam",
#     "Gujarati",
]
# DAYS_OF_DATA_CONSIDERED = 7
TRAINING_DAYS = 30
TESTING_DAYS = 3
rating_def_dict = {
#         "vplay": "is_vp_succ",
#         "like": "is_like",
#         "share": "is_share",
#         "fav": "is_fav",
        "vplay_skip": "is_vp_skip",
        "vplay2": "is_vp_succ2",
    }
BASE_BIG_QUERY_PATH = "maximal-furnace-783.rohitrr"
RANDOM_SEED = 9745

In [4]:
def readSqlFile(file_path, lang, rating_def = "", q0_table = "", 
                q1_table = "", q2_table = "", end_time = "", days = 2,
               common_posts_end_time = "", common_posts_days = 2):
    with open (file_path, "r") as file:
        sql_command=file.read()
        sql_command = sql_command.format(
            common_posts_end_time = common_posts_end_time if common_posts_end_time == "" else \
                                    str(common_posts_end_time.strftime('%Y-%m-%d %H:%M:%S')),
            common_posts_days = common_posts_days,
            days=days,
            end_time=end_time if end_time == "" else \
                        str(end_time.strftime('%Y-%m-%d %H:%M:%S')),
            language=lang,
            rating_def=rating_def,
            q0table=q0_table,
            q1table=q1_table,
            q2table=q2_table
        )
    return sql_command

def delete_tables(delete_tables_path_list):
    for delete_table_path in delete_tables_path_list:
        client.delete_table(delete_table_path)

    print("All tables deleted")

def download_large_table_to_gcs(table_path, bucket_name,
                              format_suffix = ".csv"):
    '''
        table_path - Example is "maxinal-furnace-783.rohitrr.sample_test_table"
        gcs_file_name - Should be of the form file_name/*.csv in case
                        the table is expected to be larger than 1GB
    
    Output -  A folder name with the same table name (and extra suffixes) which is 
    created in the gcs bucket. The table is partitioned and stored in the folder
    '''
    
    print("Downloading table - {} to gcs".format(table_path))
    temp = table_path.split('.')
    project = temp[0]
    dataset_id = temp[1]
    table_id = gcs_folder_name = temp[2]
    
#     In order to to ensure files don't get simply added to exisitng folders unique suffixes are generated
    unique_suffix = datetime.utcnow().strftime('%Y-%m-%d_%H:%M:%S') +\
                    '_' + str(randint(0, 200))
    gcs_folder_name = gcs_folder_name + '_' + unique_suffix

    
#     As the table is large gcs file name must be a wild card for google 
#     cloud to partition and download the table
    gcs_file_name = gcs_folder_name + "/*"+ format_suffix
#     gcs_file_name will be like file_name/*.csv

    destination_uri = "gs://{}/{}".format(bucket_name, gcs_file_name)
    dataset_ref = bigquery.DatasetReference(project, dataset_id)
    table_ref = dataset_ref.table(table_id)
    
    job_config = bigquery.ExtractJobConfig(field_delimiter="\t")
    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        job_config = job_config,
        # Location must match that of the source table.
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print(
        "Exported {}:{}.{} to {}".format(project, dataset_id, table_id, destination_uri)
    )
    return gcs_folder_name
    
def download_files_from_folder_in_gcs(bucket_name, gcs_folder_name, dest_folder_path):
    print("Downloading from gcs_folder_name {} to local".
          format(gcs_folder_name))
    
    pathlib.Path(dest_folder_path).mkdir(parents = True, exist_ok = True)

    storage_client = storage.Client(project="maximal-furncace-783")
    bucket = storage_client.get_bucket(bucket_or_name=bucket_name)
    blobs = bucket.list_blobs(prefix=gcs_folder_name)  # Get list of files
    for blob in blobs:
        filename = blob.name.replace('/', '_') 
        blob.download_to_filename(os.path.join(dest_folder_path,
                                              filename))

    
    print(f"Contents in gs://{bucket_name}/{gcs_folder_name} \
    transferred to {dest_folder_path}")

def merge_and_save_csv_files(csv_files_folder_path, dest_folder_path, with_common_header = True,
                             out_file_name = "merged_out.txt"):
    print(f"Merging and saving files from {csv_files_folder_path} to {dest_folder_path}")
    
    pathlib.Path(dest_folder_path).mkdir(parents = True, exist_ok = True)
    csv_file_names = [file_name for file_name \
                  in os.listdir(csv_files_folder_path)\
                 if file_name.endswith(".csv")]
    
    f_out = open(os.path.join(dest_folder_path, out_file_name), 'w')
    for csv_file_name in csv_file_names:
        csv_file_path = os.path.join(csv_files_folder_path, csv_file_name)
        with open(csv_file_path) as f_csv_in:
            header = next(f_csv_in)
            if(with_common_header):
                f_out.write(header)
                with_common_header = False # After writing header once, do not write again
                
            for line in f_csv_in:
                f_out.write(line)
    f_out.close()
    print(f"Saved file {out_file_name} in {dest_folder_path}")
    
def download_table_to_local_as_one_file(table_path, local_save_path, out_file_name = "fetched_table.csv",
                                        with_header = True, bucket_name = "query_runner_results"):
    local_download_folder_path = os.path.join(local_save_path, "temp_download_folder")
    if(os.path.exists(local_download_folder_path)):
        print(f"Temporary download folder - {local_download_folder_path} \
              already present - removing it to avoid using older files")
        shutil.rmtree(local_download_folder_path)
        print("Old temp folder removed")
        
    merged_out_path = local_save_path
    
    downloaded_gcs_folder_name = download_large_table_to_gcs(table_path, bucket_name)
    download_files_from_folder_in_gcs(bucket_name, downloaded_gcs_folder_name,
                                     local_download_folder_path)
    merge_and_save_csv_files(local_download_folder_path, merged_out_path, with_common_header=with_header,
                            out_file_name=out_file_name)
    
    shutil.rmtree(local_download_folder_path)   

In [5]:
def construct_base_table(lang, common_posts_end_time, 
                         common_posts_days, end_time, days,
                         overwrite_base_table = False, 
                         mode = "train"):
    
    temp_q0_table_path = BASE_BIG_QUERY_PATH+'.'+f'{mode}_temp_q0_table_{lang}'
    if(not overwrite_base_table):
        try:
            client.get_table(temp_q0_table_path)
            print(f"Table-{temp_q0_table_path} already exists, not overwriting")
            return temp_q0_table_path
        except NotFound:
            print(f"Table-{temp_q0_table_path} not already present - going ahead creating it")
            
    print(f"Running query 1 for {lang} .....")
    job_config = bigquery.QueryJobConfig(destination= temp_q0_table_path,
                                         write_disposition = "WRITE_TRUNCATE"
                                         )
    sql = readSqlFile("./queries/video/query0.sql", lang = lang,
                      common_posts_end_time = common_posts_end_time,
                      common_posts_days = common_posts_days,
                      end_time=end_time, days = days)
    query_job = client.query(sql, job_config=job_config)
    query_job.result()
    print(f"Query 0 results loaded to the table {temp_q0_table_path}")
    return temp_q0_table_path

In [6]:
def collect_and_prepare_data_with_base_table(lang, rating_def, base_q0_table_path,
                                             end_time, days,
                                             save_path,
                                             table_with_mapping = None,
                                             mode="train"):
    delete_tables = []
    # Run Q1 query
    temp_q0_table_path = base_q0_table_path
    temp_q1_table_path = BASE_BIG_QUERY_PATH+'.'+f'{mode}_temp_q1_table_{lang}_{rating_def}'
    job_config = bigquery.QueryJobConfig(destination= temp_q1_table_path,
                                         write_disposition = "WRITE_TRUNCATE"
                                         )
    sql = readSqlFile("./queries/video/query1.sql", lang = lang, 
                      rating_def = rating_def, end_time=end_time, 
                      days = days,
                     q0_table = temp_q0_table_path)
    query_job = client.query(sql, job_config=job_config)
    query_job.result()
    print(f"Query 1 results loaded to the table {temp_q1_table_path}")
    delete_tables.append(temp_q1_table_path)

    # Run Q2 query
    if(mode == "train" and table_with_mapping == None):
        table_with_mapping = BASE_BIG_QUERY_PATH+'.'+f'{mode}_temp_q2_table_{lang}_{rating_def}'
        job_config = bigquery.QueryJobConfig(destination= table_with_mapping, 
                                             write_disposition = "WRITE_TRUNCATE")
        sql = readSqlFile("./queries/video/query2.sql", lang = lang, 
                          rating_def = rating_def, end_time=end_time, days = days,
                         q1_table = temp_q1_table_path)
        query_job = client.query(sql, job_config=job_config)
        query_job.result()
        print(f"Query 2 results loaded to the table {table_with_mapping}")
        download_table_to_local_as_one_file(table_with_mapping, save_path, 
                                    out_file_name = f"user_post_ffm_mapping.csv")
        delete_tables.append(table_with_mapping)

    # Run Q3 query
    temp_q3_table_path = BASE_BIG_QUERY_PATH+'.'+f'{mode}_temp_q3_table_{lang}_{rating_def}'
    job_config = bigquery.QueryJobConfig(destination= temp_q3_table_path,
                                         write_disposition = "WRITE_TRUNCATE"
                                         )
    sql = readSqlFile("./queries/video/query3.sql", lang = lang, 
                      rating_def = rating_def, end_time=end_time, days = days,
                      q1_table = temp_q1_table_path,
                     q2_table = table_with_mapping)
    query_job = client.query(sql, job_config=job_config)
    query_job.result()
    print(f"Query 3 results loaded to the table {temp_q3_table_path}")
    delete_tables.append(temp_q3_table_path)

#     Save results to local storage
    download_table_to_local_as_one_file(temp_q3_table_path, save_path, with_header=False,
                                        out_file_name = f"{mode}.txt")
    return delete_tables, table_with_mapping


In [7]:
common_end_time = datetime(2021, 4, 30) # the hours, minutes and seconds are taken to be 0
test_end_time = common_end_time
train_end_time = common_end_time - timedelta(TESTING_DAYS)
print(test_end_time, train_end_time)

2021-03-30 00:00:00 2021-03-27 00:00:00


In [None]:
%%time
for lang in LANGS:
    base_q0_train_table_path = construct_base_table(lang, train_end_time, TRAINING_DAYS, 
                                                    train_end_time, TRAINING_DAYS,
                                             overwrite_base_table=False, mode="train")
    
    base_q0_test_table_path = construct_base_table(lang, train_end_time, TRAINING_DAYS, 
                                             test_end_time, TESTING_DAYS,
                                             overwrite_base_table=False, mode="test")
    
    for key, rating_def in rating_def_dict.items():
        save_path = f"./train_test_data_models/{lang}/{rating_def}"
        train_delete_table_paths, train_table_with_mapping = \
        collect_and_prepare_data_with_base_table(lang, rating_def,
                                 base_q0_train_table_path,
                                 end_time = train_end_time,
                                 save_path = save_path,
                                 days = TRAINING_DAYS,
                                 mode = "train")

        test_delete_table_paths, _ = \
        collect_and_prepare_data_with_base_table(lang, rating_def,
                                 base_q0_test_table_path,
                                 table_with_mapping = train_table_with_mapping,
                                 end_time = test_end_time, days = TESTING_DAYS,
                                 save_path = save_path,
                                 mode = "test")
#         Delete all created tables
#         delete_tables(
#             train_delete_table_paths+test_delete_table_paths
#         )
#         Train using xlearn binary
        print(f"Training started for label {rating_def} in {lang} .......")
        model_output_path = os.path.join(save_path, "out")
        pathlib.Path(model_output_path).mkdir(parents = True, exist_ok = True)
        cmd = f"./xlearn_train {save_path}/train.txt \
        -v {save_path}/test.txt -x auc -s 2 -k 32 -m {model_output_path}/model.out \
        -t {model_output_path}/model.txt -b 0.001 --disk 2>&1 | tee \
        {model_output_path}/logs"
        os.system(cmd)
        print(f"Model trained and saved in {model_output_path}")

Table-maximal-furnace-783.rohitrr.train_temp_q0_table_Hindi already exists, not overwriting
Table-maximal-furnace-783.rohitrr.test_temp_q0_table_Hindi already exists, not overwriting
Query 1 results loaded to the table maximal-furnace-783.rohitrr.train_temp_q1_table_Hindi_is_vp_skip
Query 2 results loaded to the table maximal-furnace-783.rohitrr.train_temp_q2_table_Hindi_is_vp_skip
Temporary download folder - ./train_test_data_models/Hindi/is_vp_skip/temp_download_folder               already present - removing it to avoid using older files
Old temp folder removed
Downloading table - maximal-furnace-783.rohitrr.train_temp_q2_table_Hindi_is_vp_skip to gcs
Exported maximal-furnace-783:rohitrr.train_temp_q2_table_Hindi_is_vp_skip to gs://query_runner_results/train_temp_q2_table_Hindi_is_vp_skip_2021-04-06_04:57:55_96/*.csv
Downloading from gcs_folder_name train_temp_q2_table_Hindi_is_vp_skip_2021-04-06_04:57:55_96 to local




Contents in gs://query_runner_results/train_temp_q2_table_Hindi_is_vp_skip_2021-04-06_04:57:55_96     transferred to ./train_test_data_models/Hindi/is_vp_skip/temp_download_folder
Merging and saving files from ./train_test_data_models/Hindi/is_vp_skip/temp_download_folder to ./train_test_data_models/Hindi/is_vp_skip
Saved file user_post_ffm_mapping.csv in ./train_test_data_models/Hindi/is_vp_skip
Query 3 results loaded to the table maximal-furnace-783.rohitrr.train_temp_q3_table_Hindi_is_vp_skip
Downloading table - maximal-furnace-783.rohitrr.train_temp_q3_table_Hindi_is_vp_skip to gcs
Exported maximal-furnace-783:rohitrr.train_temp_q3_table_Hindi_is_vp_skip to gs://query_runner_results/train_temp_q3_table_Hindi_is_vp_skip_2021-04-06_05:02:04_119/*.csv
Downloading from gcs_folder_name train_temp_q3_table_Hindi_is_vp_skip_2021-04-06_05:02:04_119 to local




### Scrap code

In [21]:
sql = readSqlFile("./queries/video/query0.sql", 
                  lang = "Odia", rating_def = rating_def, 
                  end_time=end_time)

In [None]:
client.delete_table(temp_q1_table_path)

In [44]:
pathlib.Path(f"./train_test_data/{rating_def}/{lang}").mkdir(parents = True, exist_ok = True)

In [45]:
sql = f"""
SELECT * FROM `{temp_q3_table_path}`
"""
data_df = client.query(sql).to_dataframe()

In [52]:
train_df, test_df = train_test_split(data_df, test_size = 0.2, random_state=RANDOM_SEED)

In [53]:
save_path = f"./train_test_data/{rating_def}/{lang}"
pathlib.Path(save_path).mkdir(parents = True, exist_ok = True)
test_df.to_csv(os.path.join(save_path, "test.txt"), sep="\n", header = False, index=False)

In [59]:
! ./xlearn_train ./train_test_data/is_vp_succ2/Odia/train.txt -v ./train_test_data/is_vp_succ2/Odia/test.txt -x auc -s 2 -k 32 -m out/model.out -t out/model.txt -b 0.001 --disk 2>&1 | tee out/logs

[32m[1m----------------------------------------------------------------------------------------------
           _
          | |
     __  _| |     ___  __ _ _ __ _ __
     \ \/ / |    / _ \/ _` | '__| '_ \ 
      >  <| |___|  __/ (_| | |  | | | |
     /_/\_\_____/\___|\__,_|_|  |_| |_|

        xLearn   -- 0.44 Version --
----------------------------------------------------------------------------------------------

[39m[0m[32m[------------] [0mxLearn uses 64 threads for training task.
[32m[1m[ ACTION     ] Read Problem ...[0m
[32m[------------] [0mNumber of Feature: 517707
[32m[------------] [0mNumber of Field: 2
[32m[------------] [0mTime cost for reading problem: 23.14 (sec)
[32m[1m[ ACTION     ] Initialize model ...[0m
[32m[------------] [0mModel size: 256.74 MB
[32m[------------] [0mTime cost for model initial: 0.34 (sec)
[32m[1m[ ACTION     ] Start to train ...[0m
[32m[------------][0m Epoch      Train log_loss       Test log_loss            Test AUC   

In [72]:
cmd = f"./xlearn_train {save_path}/train.txt \
-v {save_path}/test.txt -x auc -s 2 -k 32 -m {model_output_path}/model.out \
-t {model_output_path}/model.txt -b 0.001 --disk 2>&1 | tee \
{model_output_path}/logs"

In [63]:
model_output_path = os.path.join(save_path, "out")
pathlib.Path(model_output_path).mkdir(parents = True, exist_ok = True)

In [62]:
model_out_path

In [73]:
cmd

'./xlearn_train ./train_test_data/is_vp_succ2/Odia/train.txt -v ./train_test_data/is_vp_succ2/Odia/test.txt -x auc -s 2 -k 32 -m ./train_test_data/is_vp_succ2/Odia/out/model.out -t ./train_test_data/is_vp_succ2/Odia/out/model.txt -b 0.001 --disk 2>&1 | tee ./train_test_data/is_vp_succ2/Odia/out/logs'

In [74]:
os.system("touch check_file.txt")

0

In [43]:
train_delete_table_paths

['maximal-furnace-783.rohitrr.train_temp_q0_table_Odia_is_vp_succ2',
 'maximal-furnace-783.rohitrr.train_temp_q1_table_Odia_is_vp_succ2',
 'maximal-furnace-783.rohitrr.train_temp_q2_table_Odia_is_vp_succ2',
 'maximal-furnace-783.rohitrr.train_temp_q3_table_Odia_is_vp_succ2']

In [17]:
import csv

In [53]:
count = 10
rows = []

In [57]:
with open("./train_test_data_models/is_vp_succ2/Kannada/user_post_ffm_mapping.csv") as csv_file:
    csv_reader = csv.reader(csv_file, delimiter = ",")
    i = 0
    for row in csv_reader:
        rows.append(row)
        i+=1
        if(i > count):
            break

In [58]:
rows[1]

['1_post_1000004482', '1']

In [36]:
f = open("./train_test_data_models/is_vp_succ2/Kannada/user_post_ffm_mapping.csv")

In [37]:
next(f)

'1_post_1000004482\t1\n'