# Part 6.1 - Prepare Dataframe
---

### Papers Past Topic Modeling
<br/>


Ben Faulks - bmf43@uclive.ac.nz

Xiandong Cai - xca24@uclive.ac.nz

Yujie Cui - ycu23@uclive.ac.nz

In [1]:
import os, sys, subprocess
sys.path.insert(0, '../utils') # for import customed modules
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils_load import conf_pyspark, load_dataset
from utils_preplot import preplot

# intiate PySpark
sc, spark = conf_pyspark()

sc

[('spark.app.name', 'local'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '192.168.1.207'),
 ('spark.driver.memory', '62g'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1547960005066'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '44856'),
 ('spark.driver.cores', '6'),
 ('spark.driver.maxResultSize', '4g')]


## 1 Prepare Dataframe for Full Dataset

**After training the full dataset, we got the topic words list and the doc-topic matrix, now we load those data to generate the dataframe for analysys and visualization.**

### 1.1 Load data

**Load full dataset and extract metadata ("id", "region" and "date"):**

In [2]:
df_meta = load_dataset('dataset', spark).select(F.col('id').alias('id_'), F.col('region'), F.col('date'))

**Load topic words list:**

In [3]:
path = r'../5-model/model_train/topicKeys.txt'

data_schema = StructType([
    StructField('topic', IntegerType()),
    StructField('weight', FloatType()),
    StructField('words', StringType())
])

df_topics = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .option("delimiter", "\t")
    .schema(data_schema)
    .load(path)
)

In [4]:
topic_number = df_topics.count()
print('Shape of dataframe: ({}, {})'.format(topic_number, len(df_topics.columns)))
df_topics.show(20, False)

Shape of dataframe: (250, 3)
+-----+-------+---------------------------------------------------------------------------------+
|topic|weight |words                                                                            |
+-----+-------+---------------------------------------------------------------------------------+
|0    |0.01008|mr party government minister labour sir ward joseph country massey               |
|1    |0.01015|bank cent loan zealand london banks money exchange rate deposits                 |
|2    |0.00692|company apply freight steam passage zealand passengers shipping steamers agents  |
|3    |0.03237|man people men public law good fact make things state                            |
|4    |0.00528|fruit apples pears case potatoes plums peaches oranges poultry lb                |
|5    |0.04785|good time year made week great work present day weather                          |
|6    |0.00788|napier hastings bay hawke waipawa dannevirke woodville danevirke mr tarada

**Load doc-topic matrix:**

In [5]:
path = r'../5-model/model_train/docTopics.txt'

# generate new column names
columns = [str(x) for x in list(range(topic_number))]
columns.insert(0, 'id')
columns.insert(0, 'index')

# load data
df_doctopic = (
    spark.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "true")
    .option("delimiter", "\t")
    .load(path)
)

# change columns name and drop # column which is table index and useless
df_doctopic = df_doctopic.toDF(*columns)

In [6]:
print('Shape of dataframe: ({}, {})'.format(df_doctopic.count(), len(df_doctopic.columns)))

# show top 5 row
df_doctopic.limit(5).toPandas().head()

Shape of dataframe: (160140, 252)


Unnamed: 0,index,id,0,1,2,3,4,5,6,7,...,240,241,242,243,244,245,246,247,248,249
0,0,1854232,6.3e-05,6.3e-05,0.187764,0.000203,3.3e-05,0.000299,4.9e-05,6.6e-05,...,0.000112,5.5e-05,7.1e-05,4.2e-05,4.8e-05,8.5e-05,6.9e-05,4.7e-05,6.5e-05,2.8e-05
1,1,1854244,0.000146,0.000147,0.000101,0.00047,7.7e-05,0.000695,0.000115,0.000153,...,0.000261,0.000127,0.000165,9.7e-05,0.000111,0.000198,0.000161,0.000108,0.000151,6.5e-05
2,2,1854262,6.3e-05,6.4e-05,4.4e-05,0.000204,3.3e-05,0.000301,5e-05,6.6e-05,...,0.000113,5.5e-05,7.1e-05,4.2e-05,4.8e-05,8.6e-05,7e-05,4.7e-05,6.6e-05,2.8e-05
3,3,1854275,6.3e-05,6.3e-05,0.068874,0.000203,3.3e-05,0.000299,4.9e-05,6.6e-05,...,0.000112,5.5e-05,7.1e-05,4.2e-05,4.8e-05,8.5e-05,6.9e-05,4.7e-05,6.5e-05,2.8e-05
4,4,1854588,0.000109,0.000109,7.5e-05,0.000349,5.7e-05,0.000516,8.5e-05,0.000113,...,0.689759,9.4e-05,0.000122,7.2e-05,8.2e-05,0.000147,0.000119,8e-05,0.000112,4.9e-05


**In above dataframe, "index" column is the row number, "id" column is the sample/document/text id, the same with "id" in dataset, "0" to "249" columns are the weight of each topic.**

### 1.2 Add Dominant Topics Column

**Find dominant topic of each document:**

In [6]:
# https://stackoverflow.com/questions/46819405/how-to-get-the-name-of-column-with-maximum-value-in-pyspark-dataframe

def argmax(cols, *args):
    return [c for c, v in zip(cols, args) if v == max(args)][0]

def search_dominant(df):
    """
    find the dominant topic of each sample/row/document
    input: dataframe of weight of each topic
    output: the dominant topic number dataframe
    """
    argmax_udf = lambda cols: F.udf(lambda *args: argmax(cols, *args), StringType())
    return (df
            .withColumn('dominant',argmax_udf(df.columns[2:])(*df.columns[2:]))
            .select(F.col('index').alias('index_'), F.col('dominant')))

df_dominant = search_dominant(df_doctopic)

# add the df_dominant to doc-topic matrix
df_doctopic = (df_doctopic
               .join(df_dominant, df_doctopic.index == df_dominant.index_)
               .drop('index_')
               .orderBy('index'))

In [7]:
print('Shape of dataframe: ({}, {})'.format(df_doctopic.count(), len(df_doctopic.columns)))

df_doctopic.limit(5).toPandas().head()

Shape of dataframe: (160140, 253)


Unnamed: 0,index,id,0,1,2,3,4,5,6,7,...,241,242,243,244,245,246,247,248,249,dominant
0,0,1854232,6.3e-05,6.3e-05,0.187764,0.000203,3.3e-05,0.000299,4.9e-05,6.6e-05,...,5.5e-05,7.1e-05,4.2e-05,4.8e-05,8.5e-05,6.9e-05,4.7e-05,6.5e-05,2.8e-05,2
1,1,1854244,0.000146,0.000147,0.000101,0.00047,7.7e-05,0.000695,0.000115,0.000153,...,0.000127,0.000165,9.7e-05,0.000111,0.000198,0.000161,0.000108,0.000151,6.5e-05,196
2,2,1854262,6.3e-05,6.4e-05,4.4e-05,0.000204,3.3e-05,0.000301,5e-05,6.6e-05,...,5.5e-05,7.1e-05,4.2e-05,4.8e-05,8.6e-05,7e-05,4.7e-05,6.6e-05,2.8e-05,200
3,3,1854275,6.3e-05,6.3e-05,0.068874,0.000203,3.3e-05,0.000299,4.9e-05,6.6e-05,...,5.5e-05,7.1e-05,4.2e-05,4.8e-05,8.5e-05,6.9e-05,4.7e-05,6.5e-05,2.8e-05,211
4,4,1854588,0.000109,0.000109,7.5e-05,0.000349,5.7e-05,0.000516,8.5e-05,0.000113,...,9.4e-05,0.000122,7.2e-05,8.2e-05,0.000147,0.000119,8e-05,0.000112,4.9e-05,240


### 1.3 Add Metadata Column

**Here we only add "region" and "year" column as metadata. Since the full dataset has the "date" column, the accuracy of time could achieve "day" level, but currently we analyze data by "year" not by "day" or other unit, so for convenience we directly convert date to year.**

In [8]:
df_doctopic = (df_doctopic
               .join(df_meta, df_doctopic.id == df_meta.id_)
               .withColumn('year', F.date_format('date', 'yyyy'))
               .drop('id_')
               .drop('date')
               .orderBy('index'))

In [10]:
print('Shape of dataframe: ({}, {})'.format(df_doctopic.count(), len(df_doctopic.columns)))

df_doctopic.limit(5).toPandas().head()

Shape of dataframe: (160140, 255)


Unnamed: 0,index,id,0,1,2,3,4,5,6,7,...,243,244,245,246,247,248,249,dominant,region,year
0,0,1854232,6.3e-05,6.3e-05,0.187764,0.000203,3.3e-05,0.000299,4.9e-05,6.6e-05,...,4.2e-05,4.8e-05,8.5e-05,6.9e-05,4.7e-05,6.5e-05,2.8e-05,2,Auckland,1863
1,1,1854244,0.000146,0.000147,0.000101,0.00047,7.7e-05,0.000695,0.000115,0.000153,...,9.7e-05,0.000111,0.000198,0.000161,0.000108,0.000151,6.5e-05,196,Auckland,1863
2,2,1854262,6.3e-05,6.4e-05,4.4e-05,0.000204,3.3e-05,0.000301,5e-05,6.6e-05,...,4.2e-05,4.8e-05,8.6e-05,7e-05,4.7e-05,6.6e-05,2.8e-05,200,Auckland,1863
3,3,1854275,6.3e-05,6.3e-05,0.068874,0.000203,3.3e-05,0.000299,4.9e-05,6.6e-05,...,4.2e-05,4.8e-05,8.5e-05,6.9e-05,4.7e-05,6.5e-05,2.8e-05,211,Auckland,1863
4,4,1854588,0.000109,0.000109,7.5e-05,0.000349,5.7e-05,0.000516,8.5e-05,0.000113,...,7.2e-05,8.2e-05,0.000147,0.000119,8e-05,0.000112,4.9e-05,240,Canterbury,1887


### 1.4 Dominant Topics Dataframe

**It is hard to plot the doc-topic matrix (high dimension), we need transform it to extract or reduce features. First we generate dominant topics dataframe which could be used to reveal the relationship between dominant topics and region/year.**

In [9]:
df_dominant = (df_doctopic
               .join(df_topics, df_doctopic.dominant == df_topics.topic)
               .select(F.col('id'), 
                       F.col('region'), 
                       F.col('year'), 
                       F.col('dominant'), 
                       F.col('words')))

In [10]:
print('Shape of dataframe: ({}, {})'.format(df_dominant.count(), len(df_dominant.columns)))

df_dominant.limit(5).show(5, False)

Shape of dataframe: (160140, 5)
+-------+----------+----+--------+--------------------------------------------------------------------------------+
|id     |region    |year|dominant|words                                                                           |
+-------+----------+----+--------+--------------------------------------------------------------------------------+
|1854232|Auckland  |1863|2       |company apply freight steam passage zealand passengers shipping steamers agents |
|1854244|Auckland  |1863|196     |business public street begs attention notice mr inform support patronage        |
|1854262|Auckland  |1863|200     |life man world great years english work men people time                         |
|1854275|Auckland  |1863|211     |goods street stock prices large wholesale cash orders general advertisements    |
|1854588|Canterbury|1887|240     |french government russia paris troops france british turkish russian turkey     |
+-------+----------+----+--------+------

**Save the dataframe for later use:**

In [13]:
path = r'./train/domTopics'

df_dominant.write.csv(path, mode='overwrite', compression='gzip')

In [14]:
%%bash -s "$path"

cat $1/*.csv.gz > $1/domTopics.csv.gz

gunzip $1/domTopics.csv.gz

### 1.5 Average Weight Topics Dataframe

**Beside dominant topics dataframe, we could calculate average weight of each topic in a year and create dataframe for it, which could be used to reveal the weight variety of each topic as time goes on. The weights of each topic were already scaled to 0-1 by default, so the sum of average weight of each year is 1, we do not need to scale it.**

In [11]:
df_avgweight = (df_doctopic.drop('index').drop('id').drop('dominant').drop('region')
                .groupBy('year').avg().orderBy('year'))

In [12]:
df_avgweight.limit(5).toPandas().head()

Unnamed: 0,year,avg(0),avg(1),avg(2),avg(3),avg(4),avg(5),avg(6),avg(7),avg(8),...,avg(240),avg(241),avg(242),avg(243),avg(244),avg(245),avg(246),avg(247),avg(248),avg(249)
0,1840,8.1e-05,0.008279,5.6e-05,0.023775,4.2e-05,0.000384,6.3e-05,0.013187,4.7e-05,...,0.000144,7e-05,9.1e-05,5.4e-05,6.1e-05,0.000109,8.9e-05,6e-05,0.011574,3.6e-05
1,1841,0.000161,0.011706,0.004822,0.030666,0.000254,0.005892,0.000126,0.007246,9.5e-05,...,0.000287,0.00014,0.000182,0.000107,0.000122,0.001599,0.001406,0.000119,0.107492,7.2e-05
2,1842,0.000116,0.00316,0.00245,0.023355,6.1e-05,0.012671,9.1e-05,0.000454,6.8e-05,...,0.001251,0.000101,0.000131,7.7e-05,8.8e-05,0.000489,0.001364,0.011884,0.101871,5.2e-05
3,1843,0.000675,0.000822,0.003062,0.044746,4.8e-05,0.00265,7.1e-05,9.5e-05,0.000136,...,0.004082,0.000938,0.000852,0.000275,6.9e-05,0.000389,0.004093,6.7e-05,0.047911,4.1e-05
4,1844,0.000519,0.000147,0.000698,0.050104,0.003802,0.009987,0.000114,0.000153,8.6e-05,...,0.00026,0.000127,0.000336,9.7e-05,0.000111,0.000397,0.001362,0.000108,0.005803,6.5e-05


**check years are identical with dataset:**

In [17]:
print('Year range of average weight dataframe:', 
      df_avgweight.select(F.min('year'), F.max('year')).first(), 
      '\nYear number:',
      df_avgweight.select('year').distinct().count())
print('Year range of doc-topic      dataframe:', 
      df_doctopic.select(F.min('year'), F.max('year')).first(), 
      '\nYear number:',
      df_doctopic.select('year').distinct().count())

Year range of average weight dataframe: Row(min(year)='1840', max(year)='1945') 
Year number: 103
Year range of doc-topic      dataframe: Row(min(year)='1840', max(year)='1945') 
Year number: 103


**The dimension of the avrage weight dataframe is small (topic_n\*year_n), so we directly save the dataframe using Pandas:**

In [18]:
path = r'./train/avgWeight/'

df_avgweight.toPandas().to_csv(path+'avgWeight.csv', header=False, index=False, encoding='utf-8')

## 2 Prepare Dataframe for Subset

**The process for subset is the same with for full dataset, we wrapped the process to a function and call it for each subset.**

### 2.1 By Range of Time

**Generate dataframes:**

In [4]:
file = r'../5-model/model_wwi/docTopicsInfer.txt'

df_dominant, df_avgweight = preplot(file, df_meta, df_topics, spark)

**Save dataframes:**

In [6]:
path = r'./wwi/avgWeight/'
df_avgweight.toPandas().to_csv(path+'avgWeight.csv', header=False, index=False, encoding='utf-8')

path = r'./wwi/domTopics'
df_dominant.write.csv(path, mode='overwrite', compression='gzip')

**Convert multi files to a csv file:**

In [7]:
%%bash -s "$path"

cat $1/*.csv.gz > $1/domTopics.csv.gz

gunzip $1/domTopics.csv.gz

### 2.2 By Region

**Generate dataframes:**

In [11]:
file = r'../5-model/model_regions/docTopicsInfer.txt'

df_dominant, df_avgweight = preplot(file, df_meta, df_topics, spark)

**Save dataframes:**

In [13]:
path = r'./regions/avgWeight/'
df_avgweight.toPandas().to_csv(path+'avgWeight.csv', header=False, index=False, encoding='utf-8')

path = r'./regions/domTopics'
df_dominant.write.csv(path, mode='overwrite', compression='gzip')

**Convert multi files to a csv file:**

In [14]:
%%bash -s "$path"

cat $1/*.csv.gz > $1/domTopics.csv.gz

gunzip $1/domTopics.csv.gz

### 2.3 By Label

**Generate dataframes:**

In [15]:
file = r'../5-model/model_ads/docTopicsInfer.txt'

df_dominant, df_avgweight = preplot(file, df_meta, df_topics, spark)

**Save dataframes:**

In [16]:
path = r'./ads/avgWeight/'
df_avgweight.toPandas().to_csv(path+'avgWeight.csv', header=False, index=False, encoding='utf-8')

path = r'./ads/domTopics'
df_dominant.write.csv(path, mode='overwrite', compression='gzip')

**Convert multi files to a csv file:**

In [17]:
%%bash -s "$path"

cat $1/*.csv.gz > $1/domTopics.csv.gz

gunzip $1/domTopics.csv.gz

---