#### Initial Setup

In [None]:
#imported libraries
import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
import boto
import numpy as np
from numpy import matlib
import scipy as sp
import math
import matplotlib.pyplot as plt
import langdetect
import datetime
%matplotlib inline  
from sklearn.svm import SVR
from sklearn import linear_model
from sklearn.feature_selection import RFE
from sklearn.svm import SVR
from sklearn.cross_validation import KFold
from sklearn.cross_validation import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.ensemble import AdaBoostRegressor
from sklearn.preprocessing import Imputer

import os
import findspark; findspark.init()
import pyspark
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql.types import Row
import pyspark.sql.functions as func
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.util import MLUtils

from pyspark.mllib.linalg import Vectors 
from pyspark.mllib.regression import LabeledPoint  
from pyspark.mllib.regression import LinearRegressionWithSGD 

os.environ["PYSPARK_SUBMIT_ARGS"] = (
  "--packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell"
)

#NEED TO ADD "  SPARK_DRIVER_MEMORY=5G   "  to ./conf/spark-env.sh 

try:
    conf = SparkConf().set("spark.executor.memory", "3g")
    sc = pyspark.SparkContext(conf=conf)
except Exception as e:
    print "SparkContext exists... Continuing on."
    
sqlCtx = pyspark.sql.SQLContext(sc)
sc.setCheckpointDir('checkpoint/')

#### Define the generator matrix

In [None]:
try:
    output = sqlCtx.read \
    .format('com.databricks.spark.csv') \
    .options(header='true',inferSchema='true') \
    .load('s3n://cs341bucket1/Data/train_final_downloads.csv').drop('')
    gdp_countries_pd = pd.read_csv('s3://cs341bucket1/Data/GDP.csv')
except:
    output = sqlCtx.read \
    .format('com.databricks.spark.csv') \
    .options(header='true',inferSchema='true') \
    .load('train_final_downloads.csv').drop('')
    gdp_countries_pd = pd.read_csv('GDP.csv')
output = output.withColumnRenamed("cumulative_downloads_2016-02","cumulative_downloads_2016_02")

    
def get_log_week(*args):
    args = [x if not (x<0) else 0 for x in list(args)]
    nb_0 = args.count(0)
    if nb_0 == 7:
        return float(0)
    return math.log(1.0*sum(args)/(7-nb_0))
    #Y = [np.log(c) for c in args if c > 0]
    #if len(Y) == 0 : return 0
    #return float(1.0*sum(Y))/(7-nb_0)
sqlCtx.registerFunction("get_log_week", get_log_week,returnType=FloatType())
def get_download_sum(*args):
    args = [x if not (x==-1) else 0 for x in list(args)]
    return (1.0*sum(args))
sqlCtx.registerFunction("get_download_sum", get_download_sum,returnType=FloatType())


def get_coefficients(*args):
    #The first element of the list is the degree of the coefficient
    args = [x if not (x==-1) else 0 for x in list(args)]    
    return  float(np.polyfit(range(56),np.cumsum(args[1:]),args[0])[0])
    #Y = [np.log(c) for c in args[1:] if c>0]
    #if len(Y)<=1: return float(0)
    #return  float(np.polyfit(range(len(Y)),np.cumsum(Y),args[0])[0])

#Generate the step max and min 
def get_maxStep(maximum,*args):
    args = [x if not (x==-1) else 0 for x in list(args)]
    if (np.count_nonzero(args) == 0):
        return float(0)
    if maximum :
        return max(args)
    else:
        return min(filter(lambda x: x>0,args))


def get_maxStep_old(maximum,*args):
    args = [x if not (x==-1) else 0 for x in list(args)]
    m = 0
    for d in range(1,56):
        if (args[d]!=0 and args[d-1]!=0):
            c = (args[d]-args[d-1])
            #c = float(args[d])/args[d-1]
            if (maximum and m < c):
                m = c
            if ( not maximum and m > c):
                m = c
    return m
    #if m==0: return float(0)
    #return m

def get_std(*args):
    args = [x if not (x==-1) else 0 for x in list(args)]
    return float(np.std(list(args)))

def get_nbMissing(*args):
    return list(args).count(-1)

#Generate the daily average

def get_revenue_coefficients(*args):
    #The first element of the list is the degree of the coefficient
    args = list(args)
    time_series = [np.log(c) for c in args[1:] if c>0]
    if len(time_series) <=1: return float(0)
    #return float(np.polyfit(np.array(range(56)),args[1:],args[0])[0])
    return  float(np.polyfit(np.array(range(len(time_series))),time_series,args[0])[0])

def get_usage_coefficients(*args):
    #The first element of the list is the degree of the coefficient
    args = list(args)
    if -1 in args: return 0
    return  float(np.polyfit(range(8),args[1:],args[0])[0])

def get_usage_max(*args):
    #The first element of the list is the degree of the coefficient
    args = list(args)
    args = [c for c in args[1:] if c!=-1]
    if len(args)==0: return 0
    return  float(np.max(args))

def get_usage_mean(*args):
    #The first element of the list is the degree of the coefficient
    args = list(args)
    args = [c for c in args[1:] if c!=-1]
    if len(args)==0: return 0
    return  float(np.mean(args))

def get_revenue_max(*args):
    #The first element of the list is the degree of the coefficient
    args = list(args)
    time_series = [np.log(c) for c in args[1:] if c>0]
    if len(time_series) <=1: return float(0)
    #args = [c for c in args[1:] if c!=-1]
    #if len(args)==0: return 0
    #return  float(np.max(args))
    return float(np.max(time_series))

def get_revenue_mean(*args):
    #The first element of the list is the degree of the coefficient
    args = list(args)
    #args = [c for c in args[1:] if c!=-1]
    #if len(args)==0: return 0
    #return  float(np.mean(args))
    time_series = [np.log(c) for c in args[1:] if c>0]
    if len(time_series) <=1: return float(0)
    return float(np.mean(time_series))

def get_dailyAvg(*args):
    args = [x if not (x==-1) else 0 for x in list(args)]
    if (np.count_nonzero(args) == 0):
        return float(0)
    return  (1.0*sum(args)/np.count_nonzero(args))
    #return float(sum([np.log(c) for c in args if c>0]))/np.count_nonzero(args)

sqlCtx.registerFunction("get_nbMissing", get_nbMissing,returnType=IntegerType())
sqlCtx.registerFunction("get_std", get_std,returnType=FloatType())
sqlCtx.registerFunction("get_maxStep", get_maxStep,returnType=IntegerType())
sqlCtx.registerFunction("get_maxStep_old", get_maxStep_old,returnType=FloatType())
sqlCtx.registerFunction("get_coefficients", get_coefficients,returnType=FloatType())
sqlCtx.registerFunction("daily_avg", get_dailyAvg,returnType=FloatType())
sqlCtx.registerFunction("get_usage_coefficients", get_usage_coefficients,returnType=FloatType())
sqlCtx.registerFunction("get_revenue_coefficients", get_revenue_coefficients,returnType=FloatType())
sqlCtx.registerFunction("get_usage_max", get_usage_max,returnType=FloatType())
sqlCtx.registerFunction("get_usage_mean", get_usage_mean,returnType=FloatType())
sqlCtx.registerFunction("get_revenue_max", get_revenue_max,returnType=FloatType())
sqlCtx.registerFunction("get_revenue_mean", get_revenue_mean,returnType=FloatType())

lang = ['ja','zh-cn','ko','en','other']
def get_language(x):
    try:
        detected = langdetect.detect_langs(x.decode('utf8','ignore'))[0]
        if detected.prob < 0.7:
            return "other"
        elif  detected.lang in lang:
            return detected.lang
        else:
            return "other"
    except:
        return "other"
sqlCtx.registerFunction("get_language", get_language,returnType=StringType())

def get_days(date, id):
    if (date == "no_date") or True:   #Attention here, only imputation
        # return 0
        id = 1.0*id/100000000
        return int(5856.25394104 -1731.74798728*id+195.553086*id**2  -8.12861635*id**3)
    else:
        try:
            return (datetime.datetime.strptime('03/01/2015', '%m/%d/%Y').date() \
                - datetime.datetime.strptime(date, '%m/%d/%Y').date()).days
        except:
            return (datetime.datetime.strptime('03/01/2015', '%m/%d/%Y').date() \
                - datetime.datetime.strptime(date, '%Y-%m-%d').date()).days
sqlCtx.registerFunction("get_days", get_days,returnType=IntegerType())

#escape is used in case some asshole used - or [space] anywhere
def escape(text):
    return text.replace(" ","_").replace("-","_")
# number of reviews
def get_recentReviews(date):
    return int((datetime.datetime.strptime('04/01/2015', '%m/%d/%Y').date() \
            - datetime.datetime.strptime(date, '%Y-%m-%d').date()).days >=0)

def get_market(country,device):
    #http://blog.nelso.com/2010/06/iphone-os-penetration-by-country.html
    if device == "iphone":
        return {
            "United_States" : 10683403,
            "France" : 2248817,
            "Japan" : 1378903,
            "Spain" : 377346,
            "United_Kingdom" : 2551128,
            "Germany" : 1117716,
            "Hong_Kong" : 299720,
            "Switzerland" : 399364,
            "Netherlands" : 372539,
            "Australia" : 1207428,
            "Norway" : 154218,
            "Sweden" : 281622,
            "China" : 725358,
            "Canada" : 919074,
            "Denmark" : 151426,
            "Italy" : 648718,
            "Taiwan" : 174226,
            "Mexico" : 215326,
            "Austria" : 156322,
            "Brazil" : 219339,
            "Poland" : 72114,
            "Singapore" : 402922,
            "Hungary" : 33219,
            "Czech_Republic" : 42753,
            'South_Korea': 530235,
            "Russia" :246421   
    }.get(country, 15000) 
    else:
        return{
            "United_States" : 223269,
            "France" : 2724,
            "Japan" : 2293,
            "Spain" : 1494,
            "United_Kingdom" : 4197,
            "Germany" : 3403,
            "Hong_Kong" : 2306,
            "Switzerland" : 1698,
            "Netherlands" : 2554,
            "Australia" : 1400,
            "Norway" : 1333,
            "Sweden" : 1188,
            "China" : 12516,
            "Canada" : 6275,
            "Denmark" : 753,
            "Italy" : 1370,
            "Taiwan" : 1356,
            "Mexico" : 3380,
            "Austria" : 493,
            "Brazil" : 2014,
            "Poland" : 324,
            "Singapore" : 1453,
            "Hungary" : 211,
            "Czech_Republic" : 203,
            'South_Korea': 2416,
            "Russia" :2183
        }.get(country, 100) 
    return 1
sqlCtx.registerFunction("get_market", get_market,returnType=IntegerType())

def get_gdp(country):
    if country == "no_country":
        return 0.0
    return float(gdp_countries_pd[gdp_countries_pd.Country == country]["GDP2015"])
sqlCtx.registerFunction("get_gdp", get_gdp,returnType=FloatType())
    
def generate_predictors(down, rati, usag, reve, prev, rele, raco, revi ):
    ###load files : 
    try:
        sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID'])
        sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY'])
        downloads = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+down).drop('')
        ratings = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+rati).drop('')
        usages = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+usag).drop('')
        revenues = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+reve).drop('')
        prev_downloads = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+prev).drop('')  
        release_date = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+rele).drop('')
        rating_country = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load('s3n://cs341bucket1/Data/'+raco).drop('')

        reviews_schema = StructType([
            StructField("id",IntegerType(),True),
            StructField("name",StringType(),True),
            StructField("country",StringType(),True),
            StructField("rating",IntegerType(),True),
            StructField("date",StringType(),True),
            StructField("title",StringType(),True),
            StructField("version",StringType(),True),
            StructField("text",StringType(),True),
            StructField("reviewer",StringType(),True)
        ])
        reviews = pd.read_csv('s3://cs341bucket1/Data/'+revi)
        reviews = sqlCtx.createDataFrame(reviews,reviews_schema)

    except:
        downloads = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(down).drop('')
        ratings = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(rati).drop('')
        usages = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(usag).drop('')
        revenues = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(reve).drop('')
        prev_downloads = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(prev).drop('')  
        release_date = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(rele).drop('')

        rating_country = sqlCtx.read \
            .format('com.databricks.spark.csv') \
            .options(header='true',inferSchema='true') \
            .load(raco).drop('')

        reviews_schema = StructType([
            StructField("id",IntegerType(),True),
            StructField("name",StringType(),True),
            StructField("country",StringType(),True),
            StructField("rating",IntegerType(),True),
            StructField("date",StringType(),True),
            StructField("title",StringType(),True),
            StructField("version",StringType(),True),
            StructField("text",StringType(),True),
            StructField("reviewer",StringType(),True)
        ])
        reviews = pd.read_csv(revi)
        reviews = sqlCtx.createDataFrame(reviews,reviews_schema)

    ###imputations
    #usage imputation
    imputer = Imputer(missing_values=-1, strategy='median', axis=0)
    pd_usages = usages.toPandas()
    category = list(set(pd_usages["category"].values))
    imp = pd.DataFrame(columns = pd_usages.columns)
    for cat in category:
        #for dev in ["iphone","ipad"]:
            for metric in range(1,5):
                curr_df = pd_usages.ix[pd_usages["category"]==cat,:]
                #curr_df = curr_df.ix[curr_df["device"]==dev,:]
                curr_df = curr_df.ix[curr_df["metric"]==metric,:]
                name = curr_df.columns
                df1 = curr_df.ix[:,0:6]
                df2 = pd.DataFrame(imputer.fit_transform(curr_df.ix[:,6:]))
                df2.index = df1.index
                curr_df = pd.concat([df1,df2],axis = 1)
                curr_df.columns = name 
                imp = pd.concat([imp,curr_df],axis = 0)
    usages = sqlCtx.createDataFrame(imp)

    #revenue imputation
    pd_revenues = revenues.toPandas()
    imp = pd.DataFrame(columns = revenues.columns)
    for cat in category:
        for dev in ["iphone","ipad"]:
            #for metric in range(1,5):
                curr_df = pd_revenues.ix[pd_revenues["category"]==cat,:]
                curr_df = curr_df.ix[curr_df["device"]==dev,:]
                #curr_df = curr_df.ix[curr_df["metric"]==metric,:]
                name = curr_df.columns
                df1 = curr_df.ix[:,0:5]
                df2 = pd.DataFrame(imputer.fit_transform(curr_df.ix[:,5:]))
                df2.index = df1.index
                curr_df = pd.concat([df1,df2],axis = 1)
                curr_df.columns = name 
                imp = pd.concat([imp,curr_df],axis = 0)
    revenues = sqlCtx.createDataFrame(imp)

    ### Renaming
    old_dateRange = pd.date_range('03/01/2015', periods=56).format(formatter=lambda x: x.strftime('%Y-%m-%d'))
    dateRange = pd.date_range('03/01/2015', periods=56).format(formatter=lambda x: x.strftime('%m_%d_%Y'))
    for d in range(56):
        revenues = revenues.withColumnRenamed(old_dateRange[d],dateRange[d])
        usages = usages.withColumnRenamed(old_dateRange[d],dateRange[d])
        downloads = downloads.withColumnRenamed(old_dateRange[d],dateRange[d])  
    prev_downloads = prev_downloads.withColumnRenamed("cumulative_downloads_2015-02","cumulative_downloads_2015_02")
    
    #Initialization
    predictors = downloads['id','name','category','device']
    
    # Generate the weekly downloads

    sqlCtx.registerDataFrameAsTable(downloads, "downloads")
    sqlCtx.registerDataFrameAsTable(usages, "usages")
    sqlCtx.registerDataFrameAsTable(revenues, "revenues")



    predictors = sqlCtx.sql("SELECT id, name, category, device, \
               get_log_week("+ ",".join(dateRange[0:7])+") AS week_1 \
                ,get_log_week("+",".join(dateRange[7:14])+") AS week_2 \
                ,get_log_week("+ ",".join(dateRange[14:21])+") AS week_3 \
               ,get_log_week("+",".join(dateRange[21:28])+") AS week_4 \
               ,get_log_week("+",".join(dateRange[28:35])+") AS week_5 \
               ,get_log_week("+",".join(dateRange[35:42])+") AS week_6 \
               ,get_log_week("+",".join(dateRange[42:49])+") AS week_7 \
               ,get_log_week("+",".join(dateRange[49:56])+") AS week_8\
               ,get_download_sum("+ ",".join(dateRange)+") AS download_sum \
               from downloads")
    sqlCtx.registerDataFrameAsTable(predictors, "predictors")

    ###Usages
    m1 = sqlCtx.sql("SELECT * FROM usages WHERE metric = 1")
    m2 = sqlCtx.sql("SELECT * FROM usages WHERE metric = 2")
    m3 = sqlCtx.sql("SELECT * FROM usages WHERE metric = 3")
    m4 = sqlCtx.sql("SELECT * FROM usages WHERE metric = 4")
    sqlCtx.registerDataFrameAsTable(m1,"m1")
    sqlCtx.registerDataFrameAsTable(m2,"m2")
    sqlCtx.registerDataFrameAsTable(m3,"m3")
    sqlCtx.registerDataFrameAsTable(m4,"m4")
    #sqlCtx.registerDataFrameAsTable(avg_score,"avg_score")
    
    ### Make coefficients

    temp_downloads = sqlCtx.sql("SELECT id,name,category, device \
    , get_coefficients(0,"+",".join(dateRange)+") AS coef_0 \
    ,get_coefficients(1,"+",".join(dateRange)+") AS coef_1 \
    ,get_coefficients(2,"+",".join(dateRange)+") AS coef_2 \
    ,get_coefficients(3,"+",".join(dateRange)+") AS coef_3 \
    ,get_maxStep(True,"+",".join(dateRange)+") AS max_step \
    ,get_maxStep(False,"+",".join(dateRange)+") AS min_step \
    ,get_maxStep_old(True,"+",".join(dateRange)+") AS max_step_old \
    ,get_maxStep_old(False,"+",".join(dateRange)+") AS min_step_old \
    ,get_std("+",".join(dateRange)+") AS downloads_std \
    ,get_nbMissing("+",".join(dateRange)+") AS nb_missing \
    ,daily_avg(" + ",".join(dateRange[0:56]) + ") AS daily_avg \
     FROM downloads")

    predictors = predictors.join(temp_downloads,["id","name","category","device"],how='left_outer')

    temp_m1 = sqlCtx.sql("SELECT id, name, category, device, \
    get_usage_coefficients(0,"+",".join(m1.columns[5:13])+") AS m1_coef_0, \
    get_usage_coefficients(1,"+",".join(m1.columns[5:13])+") AS m1_coef_1, \
    get_usage_coefficients(2,"+",".join(m1.columns[5:13])+") AS m1_coef_2, \
    get_usage_max(0,"+",".join(m1.columns[5:13])+") AS m1_max, \
    get_usage_mean(0,"+",".join(m1.columns[5:13])+") AS m1_mean FROM m1")

    temp_m2 = sqlCtx.sql("SELECT id, name, category, device, \
    get_usage_coefficients(0,"+",".join(m2.columns[5:13])+") AS m2_coef_0, \
    get_usage_coefficients(1,"+",".join(m2.columns[5:13])+") AS m2_coef_1, \
    get_usage_coefficients(2,"+",".join(m2.columns[5:13])+") AS m2_coef_2, \
    get_usage_max(0,"+",".join(m2.columns[5:13])+") AS m2_max, \
    get_usage_mean(0,"+",".join(m2.columns[5:13])+") AS m2_mean FROM m2")

    temp_m3 = sqlCtx.sql("SELECT id, name, category, device, \
    get_usage_coefficients(0,"+",".join(m3.columns[5:13])+") AS m3_coef_0, \
    get_usage_coefficients(1,"+",".join(m3.columns[5:13])+") AS m3_coef_1, \
    get_usage_coefficients(2,"+",".join(m3.columns[5:13])+") AS m3_coef_2, \
    get_usage_max(0,"+",".join(m3.columns[5:13])+") AS m3_max, \
    get_usage_mean(0,"+",".join(m3.columns[5:13])+") AS m3_mean FROM m3")

    temp_m4 = sqlCtx.sql("SELECT id, name, category, device, \
    get_usage_coefficients(0,"+",".join(m4.columns[5:13])+") AS m4_coef_0, \
    get_usage_coefficients(1,"+",".join(m4.columns[5:13])+") AS m4_coef_1, \
    get_usage_coefficients(2,"+",".join(m4.columns[5:13])+") AS m4_coef_2, \
    get_usage_max(0,"+",".join(m4.columns[5:13])+") AS m4_max, \
    get_usage_mean(0,"+",".join(m4.columns[5:13])+") AS m4_mean FROM m4")

    temp_revenues = sqlCtx.sql("SELECT id, name, category, device, \
    get_revenue_coefficients(0,"+",".join(revenues.columns[4:])+") AS rev_coef_0, \
    get_revenue_coefficients(1,"+",".join(revenues.columns[4:])+") AS rev_coef_1, \
    get_revenue_coefficients(2,"+",".join(revenues.columns[4:])+") AS rev_coef_2, \
    get_revenue_max(0,"+",".join(revenues.columns[4:])+") AS rev_max, \
    get_revenue_mean(0,"+",".join(revenues.columns[4:])+") AS rev_mean FROM revenues")


    predictors = predictors.join(temp_revenues,["id", "name", "category","device"],how='left_outer')
    predictors = predictors.join(temp_m1,["id", "name", "category","device"],how='left_outer')
    predictors = predictors.join(temp_m2,["id", "name", "category","device"],how='left_outer')
    predictors = predictors.join(temp_m3,["id", "name", "category","device"],how='left_outer')
    predictors = predictors.join(temp_m4,["id", "name", "category","device"],how='left_outer')
    #predictors = predictors.join(avg_score,["id", "name", "category","device"],how='left_outer')
    #predictors = predictors.join(temp_usages,["id", "name", "category","device"],how='left_outer')
    #predictors = predictors.join(,)

    # previous downloads addition
    predictors = predictors.join(prev_downloads,["id","device"],how='left_outer')
    
    # Days since release generation with imputation



    release_date = downloads[["id"]].dropDuplicates().join(release_date,["id"],"left").fillna("no_date",["release_date"])
    sqlCtx.registerDataFrameAsTable(release_date, "release_date")

    temp_date = sqlCtx.sql("SELECT id\
    , get_days(release_date, id) AS days_since_release \
     FROM release_date")

    predictors = predictors.join(temp_date,["id"],"left")
    
    #ratings generation
    sqlCtx.registerDataFrameAsTable(ratings, "ratings")
    temp_ratings = sqlCtx.sql("SELECT id,name,category \
    , CAST(1.0*start1/(start1+star2+star3+star4+star5) AS float) AS star1 \
    , CAST(1.0*star2/(start1+star2+star3+star4+star5) AS float) AS star2 \
    , CAST(1.0*star3/(start1+star2+star3+star4+star5) AS float) AS star3 \
    , CAST(1.0*star4/(start1+star2+star3+star4+star5) AS float) AS star4 \
    , CAST(1.0*star5/(start1+star2+star3+star4+star5) AS float) AS star5 \
    , (start1+star2+star3+star4+star5) AS num_ratings \
     FROM ratings")

    predictors = predictors.join(temp_ratings,["id","name","category"],"left")
    
    # Categories
    list_categories = [ x.category.replace(" ","_") for x in sqlCtx.sql("SELECT category \
     FROM downloads\
     group by category \
     ").collect()]
    for cat in list_categories:
        sqlCtx.registerDataFrameAsTable(predictors, "predictors")
        predictors=sqlCtx.sql('''SELECT *, CASE WHEN (category = "'''+cat+'''") THEN 1 ELSE 0 END AS '''+cat+''' FROM predictors''')

    # Device
    sqlCtx.registerDataFrameAsTable(predictors, "predictors")
    predictors=sqlCtx.sql('''SELECT *, CASE WHEN (device = "iphone") THEN 1 ELSE 0 END AS iphone FROM predictors''')
    sqlCtx.registerDataFrameAsTable(predictors, "predictors")
    #predictors=sqlCtx.sql('''SELECT *, CASE WHEN (device = "ipad") THEN 1 ELSE 0 END AS ipad FROM predictors''')
    

    
    #Language of the title
    for l in lang:
        sqlCtx.registerDataFrameAsTable(predictors, "predictors")
        predictors=sqlCtx.sql('''SELECT *, CASE WHEN (get_language(name) = "'''+l+'''") THEN 1 \
        ELSE 0 END AS '''+l.replace("-","_")+''' FROM predictors''')
        
        
    #Reviews 


    #First step
    list_countries =['United_States', 'France', 'Japan', 'Spain', 'United_Kingdom','Saudi_Arabia', 'Germany'\
         , 'Hong_Kong', 'Switzerland', 'Turkey','Netherlands', 'Australia', 'Norway', 'Sweden', 'China', 'Canada'\
         ,'Tanzania', 'Denmark', 'South_Korea', 'Italy', 'Finland', 'Taiwan','Russia', 'Philippines', 'Slovenia'\
         , 'Ireland', 'Belgium', 'Mexico','Austria', 'India', 'Brazil', 'Benin', 'New_Zealand','United_Arab_Emirates'\
         , 'Ukraine', 'Poland', 'Israel', 'Portugal','Tunisia', 'Mali', 'Slovakia', 'Zimbabwe', 'Thailand', 'Panama'\
         ,'Indonesia', 'Singapore', 'Greece', 'Senegal', 'Nicaragua','Hungary', 'Czech_Republic', 'Macedonia', 'Chile'\
         , 'Uruguay','Malaysia', 'Algeria', 'Nepal', 'Mauritania', 'Croatia']

    cmd = '''review_rdd = reviews\
    .map(lambda x : (x.id , Row(id = x.id , avg_review = x.rating \
    , recent_review = get_recentReviews(x.date), nb_review = 1\
    ,version = set([x.version])'''
    cmd+=",country = set([x.country])"
    #for c in list_countries:
    #    cmd+=","+c+''' = int( escape(x.country) == "'''+c+'''")'''
    for i in range(1,6):
        cmd+=",review_rating_"+str(i)+" = int(x.rating == "+str(i)+")  "
    cmd+=")))"
    exec cmd in globals(), locals()

    #Group step
    cmd = '''review_rdd = review_rdd.reduceByKey(lambda x1 ,x2 : Row(\
     avg_review = x1.avg_review + x2.avg_review\
       ,recent_review = x1.recent_review + x2.recent_review, nb_review = x1.nb_review + x2.nb_review'''
    #for c in list_countries:
    #    cmd+=" , "+c+" = x1."+c+" + x2."+c
    cmd+=", country = x1.country.union(x2.country)"
    cmd+=", version = x1.version.union(x2.version)"
    for i in range(1,6):
        cmd+=", review_rating_"+str(i)+" = x1.review_rating_"+str(i)+" + x2.review_rating_"+str(i)+" "
    cmd+="))"
    exec cmd in globals(), locals()

    # Clean the grouped rdd
    cmd = '''review_rdd = review_rdd.map(lambda (id , x) : [ id \
    ,  1.0*x.avg_review /  x.nb_review\
       , x.recent_review,  x.nb_review'''
    #for c in list_countries:
    #    cmd+=" , 1.0* x."+c+"/ x.nb_review"
    cmd+=",  escape(x.country.pop())"
    cmd+=",  len(x.version) -1 " # -1 if want number of updates
    for i in range(1,6):
        cmd+=", 1.0*x.review_rating_"+str(i)+" / x.nb_review"
    cmd+="])"
    exec cmd in globals(), locals()

    #Put back into dataframe
    grp_reviews = sqlCtx.createDataFrame(review_rdd, ["id","avg_review"\
          ,"recent_reviews","nb_reviews","country","versions"]+["review_rating_"+str(i) for i in range(1,6)])
        #,"recent_reviews","nb_reviews"] + list_countries + ["versions"]+["review_rating_"+str(i) for i in range(1,6)])

    #Join with predictors 
    predictors = predictors.join(grp_reviews,["id"],"left").fillna("no_country",["country"])
    
    # Generate DL Projection
    sqlCtx.registerDataFrameAsTable(predictors, "predictors")
    derived_feats = sqlCtx.sql("SELECT id, device\
        ,(7*download_sum+cumulative_downloads_2015_02) AS dl_projection \
        ,CAST((1000000.0*num_ratings/(cumulative_downloads_2015_02 + download_sum))AS float )  AS ratings_per_downloads \
        ,CAST((1.0*num_ratings/(days_since_release+56))AS float )  AS ratings_per_day \
        ,CAST((1000000.0*nb_reviews/download_sum)AS float )  AS review_per_downloads \
        ,CAST((1.0*recent_reviews/nb_reviews)AS float )  AS review_recent_over_old \
        ,CAST((1.0*nb_reviews / days_since_release)AS float )  AS review_per_day \
        ,CAST((1.0*cumulative_downloads_2015_02/(days_since_release))AS float )  AS downloads_per_day_before \
        ,CAST(1.0*download_sum/cumulative_downloads_2015_02 AS float ) AS relative_download_increase\
        ,CAST(1.0*nb_reviews / (CASE WHEN (days_since_release<56) THEN days_since_release ELSE 56 END) AS float) AS reviews_per_days\
        ,CAST(7*download_sum+cumulative_downloads_2015_02 / days_since_release / days_since_release AS float) AS down_over_days2\
        FROM predictors")
    predictors = predictors.join(derived_feats,["id","device"],"left")



    #we could group by continent or use the market potential
    list_countries+=["no_country"]
    for co in list_countries:
        sqlCtx.registerDataFrameAsTable(predictors, "predictors")
        predictors=sqlCtx.sql('''SELECT *, CASE WHEN (country = "'''+co+'''") THEN 1 ELSE 0 END AS '''+co+''' FROM predictors''')


    sqlCtx.registerDataFrameAsTable(predictors, "predictors")
    predictors=sqlCtx.sql('''SELECT *, get_market(country,device) AS market_size FROM predictors''')
    
    sqlCtx.registerDataFrameAsTable(rating_country, "rating_country")
    rating_country = sqlCtx.sql('SELECT id\
    , CAST(SUM(star1)*1.0/(SUM(star1)+SUM(star2)+SUM(star3)+SUM(star4)+SUM(star5)) AS float) AS total_star1\
    , CAST(SUM(star2)*1.0/(SUM(star1)+SUM(star2)+SUM(star3)+SUM(star4)+SUM(star5)) AS float)AS total_star2\
    , CAST(SUM(star3)*1.0/(SUM(star1)+SUM(star2)+SUM(star3)+SUM(star4)+SUM(star5))AS float) AS total_star3\
    , CAST(SUM(star4)*1.0/(SUM(star1)+SUM(star2)+SUM(star3)+SUM(star4)+SUM(star5))AS float) AS total_star4\
    , CAST(SUM(star5)*1.0/(SUM(star1)+SUM(star2)+SUM(star3)+SUM(star4)+SUM(star5)) AS float)AS total_star5\
    , (SUM(star1)+SUM(star2)+SUM(star3)+SUM(star4)+SUM(star5)) AS total_star\
    , COUNT(1) AS nb_countries FROM rating_country GROUP BY id')

    sqlCtx.registerDataFrameAsTable(rating_country, "rating_country")

    predictors = predictors.join(rating_country,["id"],how='left_outer')
    
    #More on the ratings
    sqlCtx.registerDataFrameAsTable(predictors, "predictors")
    cmd = 'SELECT * '
    for i in range(1,6):
        cmd+=',(total_star'+str(i)+' * total_star / cumulative_downloads_2015_02) AS country_star'+str(i)+'_per_downloads '
        cmd+=',(review_rating_'+str(i)+' * nb_reviews / download_sum) AS review_star'+str(i)+'_per_downloads '
        cmd+=',(star'+str(i)+' * num_ratings / cumulative_downloads_2015_02) AS rating_star'+str(i)+'_per_downloads '
    cmd+= ' FROM predictors'

    predictors = sqlCtx.sql(cmd)
    
    sqlCtx.registerDataFrameAsTable(predictors, "predictors")
    predictors = sqlCtx.sql('SELECT *, CAST((max_step - min_step)*1.0/download_sum AS float) AS relative_step\
    , (max_step - min_step)  AS download_step \
    FROM predictors')
    

    sqlCtx.registerDataFrameAsTable(predictors, "predictors")

    predictors = sqlCtx.sql('SELECT *, get_gdp(country)  AS gdp_country \
    FROM predictors')
    
    predictors = predictors.fillna(0)
    predictors.cache()
    return predictors.toPandas().sort_values(by="id").fillna(0)

In [None]:
predictor_train = generate_predictors(  'train_app_downloads.csv'\
                                      , 'train_app_rating.csv'\
                                      , 'train_usage.csv'\
                                      , 'train_revenue.csv'\
                                      , 'train_cumulative_downloads_2015-02.csv'\
                                      , 'train_release_date.csv'\
                                      , 'train_rating_by_country.csv'\
                                      , 'train_app_review.csv')

In [None]:
predictor_test = generate_predictors(   'test_set/test_app_downloads.csv'\
                                      , 'test_set/test_app_rating.csv'\
                                      , 'test_set/test_usage.csv'\
                                      , 'test_set/test_revenue.csv'\
                                      , 'test_set/test_cumulative_downloads_2015-02.csv'\
                                      , 'test_set/test_release_date.csv'\
                                      , 'test_set/test_rating_by_country.csv'\
                                      , 'test_set/test_app_review.csv')

## To Pandas


In [None]:
predictors_train = predictors_train.sort_values(["id","device"])
predictors_test = predictors_test.sort_values(["id","device"])
output_train = pd.merge(predictors_train,output.toPandas(),on = ["id","device"],how="left")\
[["id","device", "cumulative_downloads_2016_02"]]

numerical_predictors = list(predictors_train.columns.values)
#numerical_predictors.remove("id");
numerical_predictors.remove("device");
numerical_predictors.remove("category");
numerical_predictors.remove("name");
numerical_predictors.remove("country");


predictors_train_id = predictors_train[["id","device"]].as_matrix()
predictors_test_id = predictors_test[["id","device"]].as_matrix()



#This is the metric we use to determine our performance
def metric(y_pred,y_test,percent=1.0):
    top = int(len(y_pred)/100.0*percent)
    return (len(set([i[0] for i in sorted(enumerate(y_pred), key=lambda x:x[1],reverse=True)][0:top])
       .intersection([i[0] for i in sorted(enumerate(y_test), key=lambda x:x[1],reverse=True)][0:top])
               ))*(100.0)/top

In [None]:
1000 500 simplest weight

=========== 0
Total CV_1 : 68.5714285714
Total CV_2 : 66.6142668071
Total CV : 66.875
 --- 
  =========== 1
Total CV_1 : 71.9312169312
Total CV_2 : 67.5215946844
Total CV : 68.125
 --- 
  =========== 2
Total CV_1 : 67.5925925926
Total CV_2 : 67.4953407341
Total CV : 67.8125
 --- 

In [None]:
numerical_predictors = list(predictors_train.columns.values)
numerical_predictors.remove("id");
numerical_predictors.remove("device");
numerical_predictors.remove("category");
numerical_predictors.remove("name");
numerical_predictors.remove("country");

#numerical_predictors = ['dl_projection','week_8','coef_2','week_5','downloads_per_day_before','cumulative_downloads_2015_02','week_7','downloads_std','coef_0','versions','download_sum','week_4','relative_download_increase','total_star3','days_since_release','market_size','week_3','review_rating_5','star3','review_rating_4','week_1','coef_1','m4_mean','review_rating_2','week_2','Uruguay','avg_review','nb_reviews']
#numerical_predictors = ['week_1', 'week_2', 'week_3', 'week_4', 'week_5', 'week_6', 'week_7', 'week_8', 'daily_avg', 'download_sum', 'coef_0', 'coef_1', 'coef_2', 'coef_3', 'max_step_old', 'min_step_old', 'download_std', 'nb_missing', 'iphone', 'ipad', 'Productivity', 'Entertainment', 'Travel', 'Sports', 'Music', 'Shopping', 'Finance', 'Business', 'Navigation', 'Food_and_Drink', 'Utilities', 'Newsstand', 'Health_and_Fitness', 'News', 'Lifestyle', 'Medical', 'Weather', 'Games', 'Catalogs', 'Social_Networking', 'Photo_and_Video', 'Reference', 'Books', 'Education', 'ko', 'other', 'jp', 'ch_zh', 'en', 'cumulative_downloads_2015_02', 'dl_projection', 'avg_review', 'star1', 'star2', 'star3', 'star4', 'star5', 'num_rating', 'ratings_per_downloads', 'versions', 'nb_review', 'review_per_downloads', 'days_since_release', 'downloads_per_day_before', 'review_per_day', 'ratings_per_day', 'avg_sentiment', 'm1_max', 'm1_mean', 'm1_coef_0', 'm1_coef_1', 'm1_coef_2', 'm2_max', 'm2_mean', 'm2_coef_0', 'm2_coef_1', 'm2_coef_2', 'm3_max', 'm3_mean', 'm3_coef_0', 'm3_coef_1', 'm3_coef_2', 'm4_max', 'm4_mean', 'm4_coef_0', 'm4_coef_1', 'm4_coef_2', 'rev_coef_0', 'rev_max', 'rev_mean', 'rev_coef_1', 'rev_coef_2']



np.random.seed(1)
K = 5

top_percent_classif = 12


kf = KFold(len(predictors_train), n_folds=K,shuffle=False)
new_top = []
ndcg_top =[]
top_10 = []
for train, test in kf:
    train_features = predictors_train[numerical_predictors].as_matrix()[train,:]
    train_output = output_train[["cumulative_downloads_2016_02"]].as_matrix()[train,0]
    test_features = predictors_train[numerical_predictors].as_matrix()[test,:]
    test_output = output_train[["cumulative_downloads_2016_02"]].as_matrix()[test,0]

    true_mask = test_output > sorted(test_output,reverse = True)[int(1.0/100*len(test))]

    #Classifications
    mod_class= RandomForestRegressor(max_features = 'sqrt',n_estimators = 100)   
    mod_class.fit(train_features, train_output)
    y_pred =  mod_class.predict(test_features)
    mask_1 = y_pred > (sorted(y_pred,reverse = True))[int(top_percent_classif*1.0/100*len(test))]
    
    print "classif model : "+str(metric(y_pred,test_output))

    mod_class = linear_model.Lasso(alpha=100)
    mod_class.fit(train_features, train_output)
    y_pred =  mod_class.predict(test_features)
    mask_2 = y_pred > (sorted(y_pred,reverse = True))[int(top_percent_classif*1.0/100*len(test))]

    mask = mask_1*mask_2


    #print "top classif 1: "+str(100.0*sum(true_mask*mask_1)/sum(true_mask))
    #print "top classif 2: "+str(100.0*sum(true_mask*mask_2)/sum(true_mask))
    #print "top classif inter: "+str(100.0*sum(mask_1*mask_2)/sum(mask_1))
    #mask = np.ones(len(test))

    threshold_20 = sorted(train_output,reverse = True)[int(len(train)*20.0/100)]
    threshold_12 = sorted(train_output,reverse = True)[int(len(train)*12.0/100)]
    threshold_5 = sorted(train_output,reverse = True)[int(len(train)*5.0/100)]
    threshold_1 = sorted(train_output,reverse = True)[int(len(train)*1.0/100)]
    threshold_0 = 0.0

    relevance =(range(len(train)))
    relevance.sort(key=lambda x: (train_output[x]))
    relevance = (np.asarray(relevance)-int(99.0*len(train)/100))
    relevance[relevance<0] = 0

    mod_train = GradientBoostingRegressor(n_estimators=1000, learning_rate=0.01,max_depth=2, random_state=0, loss='ls')\
    .fit(train_features,np.log(train_output))
    train_pred =  mod_train.predict(train_features)

    pred_train_mask = train_pred > sorted(train_pred,reverse = True)[int(1.0/100*len(train))]
    true_train_mask = train_output > sorted(train_output,reverse = True)[int(1.0/100*len(train))]
    hard_class = np.logical_xor(pred_train_mask,true_train_mask)


    weights = 1*(train_output >  threshold_0).astype(int)\
    +0*(train_output >  threshold_20).astype(int)\
    +1*(train_output >  threshold_12).astype(int)\
    +0*(train_output >  threshold_5).astype(int)\
    + 0*(train_output >  threshold_1).astype(int)\
    + 0*relevance\
    + 0*hard_class

    mod_top1 = GradientBoostingRegressor(max_features =1.0 ,n_estimators=1000, learning_rate=0.01,max_depth=2, random_state=0, loss='ls')\
    .fit(train_features,np.log(train_output) ,sample_weight \
    =(weights)*1.0/sum(weights))
    
    #ndcg_model = learn(train_features,train_output,  n_trees=1000, learning_rate=0.01, k=300)
    #predi = evaluate(ndcg_model,test_features)*mask

    y_pred_2 =  mod_top1.predict(test_features)*mask
    estimation_error = metric(y_pred_2,test_output)
    new_top.append(estimation_error)
    #ndcg_top.append(metric(predi,test_output))
    print estimation_error
    #print metric(predi,test_output)
print "____"
print "Top1% with classif1 : " + str(1.0*sum(new_top)/len(new_top))
print "____"
#print "Top1% with classif1 : " + str(1.0*sum(ndcg_top)/len(ndcg_top))