In [9]:
df = sqlContext.read.format('com.databricks.spark.csv') \
        .option("header", True) \
        .option("inferSchema", True) \
        .load('output/csv/chartshow/*/*.csv') \
        .selectExpr("rank", "date", "LOWER(LTRIM(RTRIM(artist))) AS artist", "LOWER(LTRIM(RTRIM(title))) AS title") \
        .createOrReplaceTempView("chartshow")
        
df = sqlContext.read.format('com.databricks.spark.csv') \
        .option("header", True) \
        .option("inferSchema", True) \
        .load('output/csv/*.csv') \
        .selectExpr("id","datetime","LOWER(LTRIM(RTRIM(artist))) AS artist","LOWER(LTRIM(RTRIM(title))) AS title","year","month","day","day_of_week","week","hour") \
        .createOrReplaceTempView("playlist")
        
spark.sql("""
SELECT
    YEAR(date) AS year,
    WEEKOFYEAR(date) AS weekofyear
FROM chartshow
GROUP BY YEAR(date), WEEKOFYEAR(date)
ORDER BY
    year ASC, weekofyear ASC
""").createOrReplaceTempView("weeks")

In [1]:
spark.sql("""
SELECT
   *
FROM playlist
WHERE 
    datetime >='2017-03-01 00:00:00'
    AND artist = 'dead man winter'
ORDER BY
    datetime DESC
""").show()

ERROR: An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line string', (1, 0))



AnalysisException: u'Table or view not found: playlist; line 4 pos 5'

In [14]:
weekofyear = 28
year = 2016
spark.sql("""
SELECT
    pl.year,
    pl.weekofyear,
    p.date,
    pl.artist,
    pl.title,
    pl.play_counts,
    p.rank
FROM 
(
    SELECT 
        YEAR(datetime) AS year, 
        WEEKOFYEAR(datetime) AS weekofyear, 
        artist,
        title,
        COUNT(*) AS play_counts
    FROM playlist
    GROUP BY 
        artist, title, YEAR(datetime), WEEKOFYEAR(datetime) 
) AS pl
LEFT OUTER JOIN (
SELECT 
    YEAR(date) AS year, 
    WEEKOFYEAR(date) AS weekofyear, 
    date,
    rank, 
    artist,
    title
FROM 
    chartshow
GROUP BY 
    artist,
    title,
    YEAR(date), 
    WEEKOFYEAR(date), 
    date,
    rank
) AS p ON p.year = pl.year AND p.weekofyear = pl.weekofyear AND p.artist = pl.artist AND p.title = pl.title
--INNER JOIN weeks w ON pl.year = w.year AND pl.weekofyear = w.weekofyear
WHERE pl.weekofyear={weekofyear} AND pl.year = {year}
AND rank IS NOT NULL
ORDER BY 
    year, weekofyear, rank ASC, play_counts DESC
""".format(weekofyear=weekofyear, year=year)).show(50)

+----+----------+--------------------+--------------------+--------------------+-----------+----+
|year|weekofyear|                date|              artist|               title|play_counts|rank|
+----+----------+--------------------+--------------------+--------------------+-----------+----+
|2016|        28|2016-07-13 00:00:...|               lizzo|        good as hell|         15|   1|
|2016|        28|2016-07-13 00:00:...|  the avett brothers|        ain't no man|          6|   2|
|2016|        28|2016-07-13 00:00:...|     case/lang/veirs|    best kept secret|         14|   3|
|2016|        28|2016-07-13 00:00:...|                beck|                 wow|         16|   4|
|2016|        28|2016-07-13 00:00:...|           radiohead|      burn the witch|         14|   5|
|2016|        28|2016-07-13 00:00:...|               adele|send my love (to ...|         10|   6|
|2016|        28|2016-07-13 00:00:...|    courtney barnett|       debbie downer|          9|   7|
|2016|        28|201

In [93]:
df = spark.sql(
"""
SELECT 
    p.*,
    CASE WHEN next.rank IS NULL THEN 0 ELSE next.rank END AS nextrank,
    CASE WHEN current.rank IS NULL THEN 0 ELSE current.rank END AS currentrank,
    CASE WHEN next.rank IS NOT NULL THEN 1 ELSE 0 END AS label
FROM playlist p
LEFT OUTER JOIN 
(
    SELECT 
        artist,
        title,
        rank
    FROM
        chartshow
    WHERE 
        YEAR(date) = 2016
        AND WEEKOFYEAR(date) = 28
)
next ON p.artist = next.artist AND p.title = next.title
LEFT OUTER JOIN 
(
    SELECT 
        artist,
        title,
        rank
    FROM
        chartshow
    WHERE 
        YEAR(date) = 2016
        AND WEEKOFYEAR(date) = 27
)
current ON p.artist = current.artist AND p.title = current.title
WHERE datetime >= '{start}' AND datetime <= '{end}'
""".format(start="2016-07-06 00:00:00", end="2016-07-13 18:00:00"))
df = df.dropna()

In [94]:
splits = df.randomSplit([0.8, 0.2])
train = splits[0].cache()
test = splits[1].cache()

In [99]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler


artistInd = StringIndexer(inputCol="artist", outputCol="artistIndex").setHandleInvalid("skip")
titleInd = StringIndexer(inputCol="title", outputCol="titleIndex").setHandleInvalid("skip")
dayOfWeekInd = StringIndexer(inputCol="day_of_week", outputCol="dayOfWeekIndex").setHandleInvalid("skip")
assembler = VectorAssembler(inputCols=["artistIndex", "dayOfWeekIndex", "titleIndex", "hour", "day", "month", "year","currentrank"], outputCol="features")

In [106]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(maxIter=10, threshold=0.1)
pipeline = Pipeline().setStages([artistInd, titleInd, dayOfWeekInd, assembler, lr])

model = pipeline.fit(train)

In [107]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

results = model.transform(test)
predictionsAndLabels = results.select("prediction", "label") \
                              .rdd.map(lambda r: (float(r["prediction"]), float(r["label"])))
metrics = BinaryClassificationMetrics(predictionsAndLabels)
print "AUC: {0}".format(metrics.areaUnderROC)
print "AUP: {0}".format(metrics.areaUnderPR)


AUC: 0.796034708909
AUP: 0.682446091644


In [103]:
from pyspark.sql.functions import udf, col
results.filter(col("prediction") == 1.0).take(20)

[Row(id=u'026e8854578b2a072003f636851fae00', datetime=datetime.datetime(2016, 7, 12, 10, 50), artist=u'margaret glaspy', title=u'you and i', year=2016, month=7, day=12, day_of_week=u'Tuesday', week=28, hour=10, nextrank=0, currentrank=12, label=0, artistIndex=11.0, titleIndex=28.0, dayOfWeekIndex=5.0, features=DenseVector([11.0, 5.0, 28.0, 10.0, 12.0, 7.0, 2016.0, 12.0]), rawPrediction=DenseVector([-1.9558, 1.9558]), probability=DenseVector([0.1239, 0.8761]), prediction=1.0),
 Row(id=u'0508b888804c11de0f5ec627c3394bac', datetime=datetime.datetime(2016, 7, 7, 3, 34), artist=u'radiohead', title=u'burn the witch', year=2016, month=7, day=7, day_of_week=u'Thursday', week=27, hour=3, nextrank=5, currentrank=4, label=1, artistIndex=7.0, titleIndex=3.0, dayOfWeekIndex=1.0, features=DenseVector([7.0, 1.0, 3.0, 3.0, 7.0, 7.0, 2016.0, 4.0]), rawPrediction=DenseVector([0.5721, -0.5721]), probability=DenseVector([0.6392, 0.3608]), prediction=1.0),
 Row(id=u'06688c3ad6f857780b25cdb366aef526', datet

In [10]:
from datetime import datetime, timedelta
def get_dates(chartshow_date):
    results = {} 
    results["end_date"] = "{0} 20:00:00".format(chartshow_date.strftime("%Y-%m-%d"))
    start_date = chartshow_date - timedelta(days=7)
    results["start_date"] = "{0} 00:00:00".format(start_date.strftime("%Y-%m-%d"))
    results["current_weekofyear"] = chartshow_date.isocalendar()[1] -1
    results["next_weekofyear"] = chartshow_date.isocalendar()[1]
    results["last_year"] = chartshow_date.year
    print results
    return results
    
def get_training_data(chartshow_date):
    dates = get_dates(chartshow_date)
    
    df = spark.sql(
    """
    SELECT 
        p.*,
        CASE WHEN next.rank IS NULL THEN 0 ELSE next.rank END AS nextrank,
        CASE WHEN current.rank IS NULL THEN 0 ELSE current.rank END AS currentrank,
        CASE WHEN next.rank IS NOT NULL THEN 1 ELSE 0 END AS label
    FROM playlist p
    LEFT OUTER JOIN 
    (
        SELECT 
            artist,
            title,
            rank
        FROM
            chartshow
        WHERE 
            YEAR(date) = {year}
            AND WEEKOFYEAR(date) = {next_weekofyear}
    )
    next ON p.artist = next.artist AND p.title = next.title
    LEFT OUTER JOIN 
    (
        SELECT 
            artist,
            title,
            rank
        FROM
            chartshow
        WHERE 
            YEAR(date) = {year}
            AND WEEKOFYEAR(date) = {current_weekofyear}
    )
    current ON p.artist = current.artist AND p.title = current.title
    WHERE
    datetime >= '{start}' AND datetime <= '{end}'
    ORDER BY p.datetime DESC
    """.format(year=dates["last_year"], current_weekofyear=dates['current_weekofyear'], next_weekofyear=dates["next_weekofyear"], start=dates["start_date"], end=dates["end_date"]))
    return df
    
get_training_data(datetime(2017,3,1)).show(50)



{'current_weekofyear': 8, 'next_weekofyear': 9, 'start_date': '2017-02-22 00:00:00', 'end_date': '2017-03-01 20:00:00', 'last_year': 2017}
+--------------------+--------------------+--------------------+--------------------+----+-----+---+-----------+----+----+--------+-----------+-----+
|                  id|            datetime|              artist|               title|year|month|day|day_of_week|week|hour|nextrank|currentrank|label|
+--------------------+--------------------+--------------------+--------------------+----+-----+---+-----------+----+----+--------+-----------+-----+
|6fc071f418678eb80...|2017-03-01 19:59:...|       elliott smith|   i figured you out|2017|    3|  1|  Wednesday|   9|  19|      20|          0|    1|
|16e43ddf0a8deaf9b...|2017-03-01 19:55:...|         tame impala|    'cause i'm a man|2017|    3|  1|  Wednesday|   9|  19|       0|          0|    0|
|25abaf72431c2f38b...|2017-03-01 19:51:...|  the japanese house|   face like thunder|2017|    3|  1|  Wednesday