### Spark setup

In [1]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType

In [2]:
import string
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[9]') \
    .config("spark.driver.memory", "15g") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

### Load data

In [4]:
totalcounts = spark.read.option('header', False)\
                     .option('lineSep', '\t')\
                     .csv('/data/shared1/total_counts/totalcounts-2')

In [5]:
# årets hack

folders = ["/data/shared1/bigrams/Part1", "/data/shared1/bigrams/Part2",  "/data/shared1/bigrams/Part3", "/data/shared1/bigrams/Part4"] 

#folders = ["/data/shared1/bigrams/part4", "/data/shared1/bigrams/part5", "/data/shared1/bigrams/part6"]

df = spark.read.format("csv").option("delimiter",";").load(folders)

for i in range(0, 500):
    df = df.withColumn('c'+str(i),split("_c0","\t").getItem(i))
    
df = df.drop('_c0')

### Clean data

In [6]:
totalcounts = totalcounts.withColumnRenamed('_c0', 'year')
totalcounts = totalcounts.withColumnRenamed('_c1', 'bigram_count')
totalcounts = totalcounts.withColumnRenamed('_c2', 'page_count')
totalcounts = totalcounts.withColumnRenamed('_c3', 'volume_count')
totalcounts = totalcounts.withColumn('year', totalcounts['year'].cast('int'))
totalcounts = totalcounts.where(totalcounts['year'] >= 1900)

In [7]:
def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

In [8]:
def transform(df):
    split_col = pyspark.sql.functions.split(df['val'], ',')
    df = df.withColumn('year', split_col.getItem(0))
    df = df.withColumn('count', split_col.getItem(1))
    df = df.withColumn('volume', split_col.getItem(2))

    df = df.withColumnRenamed('c0', 'bigram')
    
    split_pos = pyspark.sql.functions.split(df['bigram'], ' ')
    df = df.withColumn('bigram1', split_pos.getItem(0))
    df = df.withColumn('bigram2', split_pos.getItem(1))

    split_bigram1 = pyspark.sql.functions.split(df['bigram1'], '_')
    df = df.withColumn('POS1', split_bigram1.getItem(1))
    split_bigram2 = pyspark.sql.functions.split(df['bigram2'], '_')
    df = df.withColumn('POS2', split_bigram2.getItem(1))

    df = df.withColumn('year', df['year'].cast('int'))
    df = df.withColumn('count', df['count'].cast('int'))
    
    df = df.drop('val')
    df = df.drop('key')
    df = df.drop('bigram1', 'bigram2')
    df = df.drop('volume')
    
    df = df.where(df['year'] >= 1900)
    
    return df

In [9]:
from pyspark.sql.functions import lower

def clean(df):
    
    df = df.where(col('POS1').isNotNull() & col('POS2').isNotNull())
    
    df = df.where(((col('POS1') == 'ADJ') | (col('POS1') == 'NOUN')) &
                  ((col('POS2') == 'ADJ') | (col('POS2') == 'NOUN')))
    
    df = df.where(~df.bigram.contains('_NOUN_') & ~df.bigram.contains('_ADJ_'))

    df = df.withColumn('bigram_noPOS', df.bigram)
    df = df.withColumn('bigram_noPOS', regexp_replace('bigram_noPOS', '_NOUN', ''))
    df = df.withColumn('bigram_noPOS', regexp_replace('bigram_noPOS', '_ADJ', ''))
    
    # only keep bigrams consisting of two words
    df = df.where(df.bigram_noPOS.rlike("\w+ \w+"))
    df = df.where(~(df.bigram_noPOS.rlike("[.,;:_'^?<>]+")))
    df = df.where(~(df.bigram_noPOS.rlike("[0-9]+")))

    # remove all nulls
    df = df.withColumn('bigram', lower(col('bigram')))
    
    # only keep bigrams consisting of ascii chars
    is_ascii = udf(lambda x: x.isascii(), BooleanType())
    df = df.where(is_ascii(df.bigram_noPOS))
    
    df = df.drop('POS1', 'POS2', 'bigram_noPOS')
    
    df = df.groupBy('bigram', 'year').agg(F.sum('count').alias('count'))
            
    return df

In [10]:
df_long = to_long(df, ["c0"])

In [11]:
df_long = df_long.dropna()

In [12]:
df = transform(df_long)

In [13]:
df = clean(df)

In [14]:
# fill dataset such that each bigram has an entry for each year - fill with 0 if bigram doesn't appear in given year

periods = spark.range(1900, 2019+1).withColumnRenamed("id", "year")
unique_bigrams = df.select('bigram').distinct()

periods = unique_bigrams.crossJoin(periods)

full = periods.join(df, ['bigram', 'year'], how="full")
full = full.fillna({'count': 0})

In [15]:
# construct dataframe containing all rows and totalcounts
totalcounts = totalcounts.withColumnRenamed('year', 'year_totalcounts')
df = full.join(totalcounts.select('bigram_count', 'year_totalcounts'), full.year == totalcounts.year_totalcounts, how='left')
df = df.withColumnRenamed('bigram_count', 'total_bigrams')
df = df.drop('year_totalcounts')

In [16]:
df.printSchema()

root
 |-- bigram: string (nullable = true)
 |-- year: long (nullable = true)
 |-- count: long (nullable = false)
 |-- total_bigrams: string (nullable = true)



### Calculate stats

In [17]:
# compute percentages
df = df.withColumn('bigram_percent', (F.col('count') / F.col('total_bigrams')) * 100)

df = df.drop('total_bigrams')

In [18]:
# compute smooth percentages (smoothing 3)
w = Window.partitionBy("bigram").orderBy("year")
w_smooth = Window.partitionBy('bigram').orderBy('year').rowsBetween(-2, 2)

df = df.withColumn('bigram_percent_smooth', avg(df.bigram_percent).over(w_smooth))

### Filter out ngrams that never reach threshold for 3 consecutive years

In [20]:
threshold = 5e-6
df = df.withColumn('over_threshold', when(df.bigram_percent > threshold, 1).otherwise(0))

In [21]:
w1 = Window.partitionBy(df.bigram).orderBy(df.year)
w2 = Window.partitionBy(df.bigram,df.over_threshold).orderBy(df.year)

df = df.withColumn('grp',F.row_number().over(w1)-F.row_number().over(w2))

#Window definition for streak
w3 = Window.partitionBy(df.bigram,df.over_threshold,df.grp).orderBy(df.year)
streakdf = df.withColumn('streak_1',F.when(df.over_threshold == 0,0).otherwise(F.row_number().over(w3)))

In [22]:
w4 = Window.partitionBy(df.bigram)
cleandf = streakdf.withColumn('max_streak',F.max(col('streak_1')).over(w4)).where(col('max_streak') > 2)

cleandf = cleandf.drop('over_threshold', 'grp', 'streak_1', 'max_streak')

### Save data

In [27]:
cleandf.repartition('bigram').sortWithinPartitions('bigram', 'year').write.csv('/data/shared1/cleandata/uppercase_clean')