In [None]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.types import StructField, StructType, StringType, FloatType, IntegerType
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
import pyspark.sql.functions as F

# %pip install spark-nlp
from sparknlp.annotator import LemmatizerModel, Tokenizer, Normalizer, StopWordsCleaner, NGramGenerator
from sparknlp.base import Finisher, DocumentAssembler

pyspark.__version__

In [None]:
credentials_path = "/home/ztmj96/.google/credentials/de-r-stocks.json"

# Spark configuration
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "gcs-connector-hadoop3-2.2.5.jar, spark-bigquery-latest_2.12.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_path) \
    .set('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.3')

# Spark context
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_path)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

# Start Spark session
# In production setting, master will be specified during spark-submit
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [None]:
# Access data from GCS
df = spark.read.parquet('gs://datalake_de-r-stocks/stocks/submission/stocks_submission_2022-02-06.parquet')

# 1. Remove posts by AutoModerator
# 2. Remove duplicate titles
# 3. Convert unix timestamp to date
# 4. Keep title and date columns
df_filter = df.filter(~F.col('author').contains('AutoModerator')) \
    .dropDuplicates(['title']) \
        .withColumn('date', F.from_unixtime(F.col('created_utc'), 'yyyy-MM-dd')) \
            .select('title', 'date')
            
documentAssembler = DocumentAssembler() \
     .setInputCol('title') \
     .setOutputCol('title_document')

tokenizer = Tokenizer() \
     .setInputCols(['title_document']) \
     .setOutputCol('title_token')

normalizer = Normalizer() \
     .setInputCols(['title_token']) \
     .setOutputCol('title_normalized') \
     .setLowercase(True)

lemmatizer = LemmatizerModel.pretrained() \
            .setInputCols(['title_normalized']) \
            .setOutputCol('title_lemma')

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['title_lemma']) \
     .setOutputCol('title_cleaned') \
     .setCaseSensitive(False)

ngrams_cum = NGramGenerator() \
            .setInputCols(["title_cleaned"]) \
            .setOutputCol("title_ngrams") \
            .setN(2) \
            .setEnableCumulative(True)\
            .setDelimiter("_") # Default is space

finisher = Finisher() \
     .setInputCols(['title_ngrams']) \
     .setOutputCols(['title_finished']) \
     .setCleanAnnotations(False)

nlpPipeline = Pipeline(stages=[
              documentAssembler, 
              tokenizer,
              normalizer,
              lemmatizer,
              stopwords_cleaner,
              ngrams_cum,
              finisher
 ])

df_result = nlpPipeline.fit(df_filter).transform(df_filter).select('title_finished', 'date')

# CountVectorizer model
cv = CountVectorizer(inputCol='title_finished', outputCol='features', minDF=3.0)

# Train on all submissions
model = cv.fit(df_result)

df_tokensbydate = df_result.groupBy('date').agg(F.flatten(F.collect_list('title_finished')).alias('title_finished'))

# Get counts for each date
counts = model.transform(df_tokensbydate).select('date','features').collect()

# Create empty dataframe
df_wordcountbydate = spark.createDataFrame(spark.sparkContext.emptyRDD(), 
                        schema=StructType(fields=[
                            StructField("word", StringType()), 
                            StructField("count", FloatType()),
                            StructField("date", StringType())]))

# Append count for each day to dataframe
for row in range(len(counts)):
    test_dict = dict(zip(model.vocabulary, (float(x) for x in counts[row]['features'].values)))
    df_temp = spark.createDataFrame(test_dict.items(), 
                        schema=StructType(fields=[
                            StructField("word", StringType()), 
                            StructField("count", FloatType())]))
    df_temp = df_temp.withColumn('date', F.lit(counts[row]['date']))
    df_wordcountbydate = df_wordcountbydate.unionAll(df_temp)

In [None]:
df_wordcountbydate = df_wordcountbydate.withColumn('count', F.col('count').cast(IntegerType())) \
                        .withColumn('submission_date', F.to_date(F.col('date'), 'yyyy-MM-dd')) \
                        .withColumnRenamed('count', 'wordcount') \
                        .drop('date')

In [None]:

# upload dataframe to BigQuery
df_wordcountbydate.write.format('bigquery') \
  .option('temporaryGcsBucket', 'datalake_de-r-stocks') \
  .save('de-r-stocks.stocks_data.submission_wordcount')

To run the script on the cluster

```sh
spark-submit \
    --master="${URL}" \
    06_spark_sql.py \
        --input_green=data/pq/green/2021/*/ \
        --input_yellow=data/pq/yellow/2021/*/ \
        --output=data/report-2021
```

```
gcloud dataproc jobs submit spark --properties spark.jars.packages=com.google.cloud.spark:spark-bigquery_2.11:0.9.1-beta

gcloud dataproc jobs submit pyspark --cluster=${CLUSTER} \
    /path/to/your/script.py \
    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar

spark-submit --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop2-latest.jar \
    /path/to/your/script.py
```