In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline

import findspark
findspark.init()

# Import and start a Spark session.
from pyspark.sql import *
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

examples.directory is deprecated; in the future, examples will be found relative to the 'datapath' directory.
  "found relative to the 'datapath' directory.".format(key))


In [2]:
def load_insights(months):
    return {month: spark.read.parquet('hashtags_insights_{}.parquet'.format(month)) for month in months} 

We start by loading the different hashtags insights for each months. The month of January is omitted as it is corrupted, the month of June is also omitted as it is too small (we only have data for an extract of two days), and the month december does not exists.

In [3]:
hashtags = load_insights(['02', '03', '04', '05', '07', '08', '09', '10', '11'])
#hashtags = load_insights(['02', '03'])

In [5]:
all_days = [str(i) for i in range(1, 32)]
all_months = [str(i).zfill(2) for i in range(2, 4)]

In [6]:
allColumns = \
    ["tag", "count", "print"] + \
    [month + "_print_" + day for day in all_days for month in all_months] + \
    [month + "_count_" + day for day in all_days for month in all_months]

We define a function which will allow us to unify the columns of every sub dataframes by renaming some columns and adding some others.

In [77]:
def rename_columns(insights, prefix):
    printCols = [c for c in insights.columns if "print_" in c]
    nameCols = [c for c in insights.columns if "count_" in c]
    print(insights, prefix)
    for x in printCols + nameCols:
        insights = insights.withColumnRenamed(x, prefix + "_" + x)
    columns = insights.columns
    for i, x in enumerate(allColumns):
        if i % 50 == 0:
            print(i, len(allColumns))
        if not x in columns:
            insights = insights.withColumn(x, F.lit(0).cast("long"))
    return insights

We then apply this function to all of our insights.

In [78]:
insights = [rename_columns(hashtags[key], key) for key in hashtags]

DataFrame[tag: string, count: bigint, print: bigint, print_1: bigint, print_2: bigint, print_3: bigint, print_4: bigint, print_5: bigint, print_6: bigint, print_7: bigint, print_8: bigint, print_9: bigint, print_10: bigint, print_11: bigint, print_12: bigint, print_13: bigint, print_14: bigint, print_15: bigint, print_16: bigint, print_17: bigint, print_18: bigint, print_19: bigint, print_20: bigint, print_21: bigint, print_22: bigint, print_23: bigint, print_24: bigint, print_25: bigint, print_26: bigint, print_27: bigint, print_28: bigint, print_29: bigint, print_30: bigint, print_31: bigint, count_1: bigint, count_2: bigint, count_3: bigint, count_4: bigint, count_5: bigint, count_6: bigint, count_7: bigint, count_8: bigint, count_9: bigint, count_10: bigint, count_11: bigint, count_12: bigint, count_13: bigint, count_14: bigint, count_15: bigint, count_16: bigint, count_17: bigint, count_18: bigint, count_19: bigint, count_20: bigint, count_21: bigint, count_22: bigint, count_23: b

We merge every insights in one unique dataframe. We have to make sure that every columns is in the same order.

In [64]:
from functools import reduce

def unionAll(dfs):
    return reduce(DataFrame.union, dfs)
unifiedInsights = unionAll([i.select(allColumns) for i in insights])

As some hashtags might appear in different months, we need to merge duplicates occurencies of the same hashtags.

In [71]:
columnsToSum = [x for x in unifiedInsights.columns if x != "tag"]
aggExpressions = [F.sum(F.col(col)).alias(col) for col in columnsToSum]

In [15]:
unifiedInsightsGB = unifiedInsights.groupBy('tag').agg(*aggExpressions)

In [16]:
unifiedInsightsGB.write.mode("overwrite").parquet("unifiedinsights.parquet")

In [26]:
unifiedInsights = spark.read.parquet("unifiedinsights.parquet")

In [32]:
unifiedInsights.where(F.col('11_print_4') != 0).count()

0

In [33]:
hashtags["11"].where(F.col("print_4") != 0).count()

233063

In [18]:
for x in allColumns:
    if unifiedInsights.where(F.col(x) != 0).count() == 0:
        unifiedInsights = unifiedInsights.drop(x)

In [29]:
[x for x in unifiedInsights.columns if "11" in x]

['02_print_11',
 '02_count_11',
 '11_print_1',
 '11_print_2',
 '11_print_3',
 '11_print_4',
 '11_print_5',
 '11_print_6',
 '11_print_7',
 '11_print_8',
 '11_print_9',
 '11_print_10',
 '01_print_11',
 '03_print_11',
 '04_print_11',
 '05_print_11',
 '06_print_11',
 '07_print_11',
 '08_print_11',
 '09_print_11',
 '10_print_11',
 '11_print_11',
 '12_print_11',
 '11_print_12',
 '11_print_13',
 '11_print_14',
 '11_print_15',
 '11_print_16',
 '11_print_17',
 '11_print_18',
 '11_print_19',
 '11_print_20',
 '11_print_21',
 '11_print_22',
 '11_print_23',
 '11_print_24',
 '11_print_25',
 '11_print_26',
 '11_print_27',
 '11_print_28',
 '11_print_29',
 '11_print_30',
 '11_print_31',
 '11_count_1',
 '11_count_2',
 '11_count_3',
 '11_count_4',
 '11_count_5',
 '11_count_6',
 '11_count_7',
 '11_count_8',
 '11_count_9',
 '11_count_10',
 '01_count_11',
 '03_count_11',
 '04_count_11',
 '05_count_11',
 '06_count_11',
 '07_count_11',
 '08_count_11',
 '09_count_11',
 '10_count_11',
 '11_count_11',
 '12_count

In [25]:
unifiedInsights.columns

['count',
 'print',
 '02_print_1',
 '02_print_2',
 '02_print_3',
 '02_print_4',
 '02_print_5',
 '02_print_6',
 '02_print_7',
 '02_print_8',
 '02_print_9',
 '02_print_10',
 '02_print_11',
 '02_print_12',
 '02_print_13',
 '02_print_14',
 '02_print_15',
 '02_print_16',
 '02_print_17',
 '02_print_18',
 '02_print_19',
 '02_print_20',
 '02_print_21',
 '02_print_22',
 '02_print_23',
 '02_print_24',
 '02_print_25',
 '02_print_26',
 '02_print_27',
 '02_print_28',
 '02_print_29',
 '02_print_30',
 '02_print_31',
 '02_count_1',
 '02_count_2',
 '02_count_3',
 '02_count_4',
 '02_count_5',
 '02_count_6',
 '02_count_7',
 '02_count_8',
 '02_count_9',
 '02_count_10',
 '02_count_11',
 '02_count_12',
 '02_count_13',
 '02_count_14',
 '02_count_15',
 '02_count_16',
 '02_count_17',
 '02_count_18',
 '02_count_19',
 '02_count_20',
 '02_count_21',
 '02_count_22',
 '02_count_23',
 '02_count_24',
 '02_count_25',
 '02_count_26',
 '02_count_27',
 '02_count_28',
 '02_count_29',
 '02_count_30',
 '02_count_31']

In [21]:
[x for x in unifiedInsights.columns if "01" in x]

[]

In [22]:
unifiedInsights.write.mode("overwrite").parquet("insights.parquet")

In [23]:
unifiedInsights.where(F.col('03_count_1') != 0).count()


0

In [24]:
hashtags["03"].columns

['tag',
 'count',
 'print',
 'print_1',
 'print_2',
 'print_3',
 'print_4',
 'print_5',
 'print_6',
 'print_7',
 'print_8',
 'print_9',
 'print_10',
 'print_11',
 'print_12',
 'print_13',
 'print_14',
 'print_15',
 'print_16',
 'print_17',
 'print_18',
 'print_19',
 'print_20',
 'print_21',
 'print_22',
 'print_23',
 'print_24',
 'print_25',
 'print_26',
 'print_27',
 'print_28',
 'print_29',
 'print_30',
 'print_31',
 'count_1',
 'count_2',
 'count_3',
 'count_4',
 'count_5',
 'count_6',
 'count_7',
 'count_8',
 'count_9',
 'count_10',
 'count_11',
 'count_12',
 'count_13',
 'count_14',
 'count_15',
 'count_16',
 'count_17',
 'count_18',
 'count_19',
 'count_20',
 'count_21',
 'count_22',
 'count_23',
 'count_24',
 'count_25',
 'count_26',
 'count_27',
 'count_28',
 'count_29',
 'count_30',
 'count_31']