<a href="https://colab.research.google.com/github/mazinkamal134/DS_MRP_2024/blob/main/Master_Dataset_Processing_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook will generate the master tweet dataset, from which all individual disorder datasets could be created. Following the below steps:
- Ingest the tweets (control and treatment)
- Add the diagnosis dates and treatment dates
- Add the music sessions
- Add the TensiStrengh stress scores
- Add the user info
- Add the demographic info

It depends on two other pipelines:
- TensiStrength pipeline
- Demographic inference pipeline



In [None]:
import pandas as pd
import pickle
from datetime import datetime
import os
import json

In [None]:
# mount the Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Global Params

In [None]:
tweetsDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Tweets"
datesDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Tweets/Treatment Dates"
tensiStrenghtDir = "/content/drive/MyDrive/Master-2024/MRP/Data/TensiStrength"
demographicsDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Demographics"
authorsDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Authors"
musicDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Music"
anxietyStrataDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Authors/PSM/anxiety/Stratums"
depressionStrataDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Authors/PSM/depression/Stratums"
ptsdStrataDir = "/content/drive/MyDrive/Master-2024/MRP/Data/Authors/PSM/ptsd/Stratums"

In [None]:
# Read parquet file for modifications only, such as updating the demographic or Stress scores
# Otherwise, skip this step and go through all other steps one by one
"""
fileName = os.path.join(tweetsDir, "processedTweets.parquet")
tweetsDf = pd.read_parquet(fileName)
"""

### Ingest the control tweets

In [None]:
# Read the control Tweets CSV file and pickle
fileName = os.path.join(tweetsDir, "control_tweets.csv")
controlTweetsDf = pd.read_csv(fileName)
# Add the group
controlTweetsDf["group"] = 0
print("Control Shape:", controlTweetsDf.shape)

### Ingest the treatment tweets

In [None]:
# Read the treatment Tweets CSV file and pickle
fileName = os.path.join(tweetsDir, "treatment_tweets.csv")
treatmentTweetsDf = pd.read_csv(fileName)
# Add the group
treatmentTweetsDf["group"] = 1
print("Treatment Shape:", treatmentTweetsDf.shape)

### Combine control and treatment

In [None]:
# Combine
tweetsDf = pd.concat([controlTweetsDf, treatmentTweetsDf])
print("Shape combined:", tweetsDf.shape)

# Free up the menmory (only on Jupytor, no need on Google Colab with High-RAM)
#del controlTweetsDf
#del treatmentTweetsDf

# Fix the data types
tweetsDf["created_at"] = pd.to_datetime(tweetsDf.created_at).dt.tz_convert(None)
tweetsDf["author_id"] = tweetsDf["author_id"].astype("str")

# Reorder the columns
cols = ["id", "tweet_type", "referenced_tweet_type", "created_at", "lang", "disorder", "author_id", "text", "cleaned_text", "retweet_count", "reply_count", "like_count", "quote_count", "source", "group"]
tweetsDf = tweetsDf[cols]

### Get the diagnose dates


In [None]:
diagnosisDatesDf = tweetsDf[tweetsDf.tweet_type == "diagnose"].reset_index(drop = True)
# Convert the created_at to date
diagnosisDatesDf["created_at"] = diagnosisDatesDf["created_at"].dt.date
# Keep only the author_id, created_at, and group, and rename created_at to diagnosis_date
diagnosisDatesDf = diagnosisDatesDf[["author_id", "created_at"]].rename(columns = {"created_at": "diagnosis_date"})
# Select the tweet with the minimum created data for each author and remove others
diagnosisDatesDf = diagnosisDatesDf.groupby("author_id").diagnosis_date.min().reset_index()
print ("Diagnosis Dates Shape:", diagnosisDatesDf.shape)
diagnosisDatesDf.sample()

#### Update the diagnosis dates for all

In [None]:
# Add the diagnosis date to the main dataframe
tweetsDf = tweetsDf.merge(diagnosisDatesDf, on = ["author_id"], how = "left")
print("Shape after updating with diagnosis date:", tweetsDf.shape)
# Count the unmatched records
print("Unmatched:", tweetsDf[tweetsDf.diagnosis_date.isna()].shape[0])
# Check
tweetsDf.sample()

### Populate the treatment dates

In [None]:
# Add the anchor dates
controlFilePath = os.path.join(datesDir, "control_users_features_summary.json")
treatmentFilePath = os.path.join(datesDir, "treatment_users_features_summary.json")

# Read the json data
with open(controlFilePath, "rb") as f:
    file = json.load(f)
controlDf = pd.DataFrame(file).transpose().reset_index()
controlDf["Group"] = 0
with open(treatmentFilePath, "rb") as f:
    file = json.load(f)
treatmentDf = pd.DataFrame(file).transpose().reset_index()
treatmentDf["Group"] = 1

# Combine
treatmentDatesDf = pd.concat([controlDf, treatmentDf], ignore_index = True)

# Process
treatmentDatesDf.rename(columns = {"index": "author_id", "diagnose_date": "treatment_date"}, inplace = True)

# drop unnecessary columns
colsToKeep = ["author_id", "treatment_date", "Group"]
treatmentDatesDf = treatmentDatesDf[colsToKeep]

# Fix the data types
treatmentDatesDf["treatment_date"] = pd.to_datetime(treatmentDatesDf["treatment_date"])

# Drop duplicates
treatmentDatesDf.drop_duplicates()

# Check
print("Shape:", treatmentDatesDf.shape)
# Select the author with the minimum treatment date
treatmentDatesDf = treatmentDatesDf.groupby(["author_id"]).treatment_date.min().reset_index()
treatmentDatesDf.sample()

In [None]:
# Join the treatment dates with the main dataframe
tweetsDf = tweetsDf.merge(treatmentDatesDf, on = ["author_id"], how = "left")
print("Shape after adding the treatment dates", tweetsDf.shape)
# Check
tweetsDf.sample()

### Add the music sessions

In [None]:
# Ingest the music file
musicFilePath = os.path.join(musicDir, "music.csv")
musicDf = pd.read_csv(musicFilePath)
# Fix the data types
musicDf["created_at"] = pd.to_datetime(musicDf["created_at"]).dt.tz_convert(None)
musicDf["author_id"] = musicDf["author_id"].astype("str")
# rename tweet_id to id
musicDf.rename(columns = {"tweet_id": "id"}, inplace = True)
# Keep only the tweet and music_id columns
musicDf = musicDf[["id", "music_id"]]
# Drop duplicates
musicDf.drop_duplicates(inplace = True)
# Check
print("Shape:", musicDf.shape)
musicDf.sample()
# tweets with multiple music ids
musicDf.groupby("id").music_id.nunique().reset_index().query("music_id > 1").shape

In [None]:
# Update the main dataframe by adding music_id
tweetsDf = tweetsDf.merge(musicDf, on = ["id"], how = "left")
print("Shape after adding the music sessions", tweetsDf.shape)
# Update the unmatched music_id with 0
tweetsDf["music_id"] = tweetsDf["music_id"].fillna(0)
# Check
tweetsDf.sample()

In [None]:
# Update the tweet type, set it to "treatment" wherever the music_id is not 0
tweetsDf.loc[tweetsDf["music_id"] != 0, "tweet_type"] = "treatment"
# Check
tweetsDf.sample()

### Add Demographics
Before running this part make sure the demographics pipeline was run and all 3 demographics files were created successfully

In [None]:
# Read the demographics user files
ageDf = pd.read_pickle(os.path.join(demographicsDir, "authorAge.pickle"))
genderDf = pd.read_pickle(os.path.join(demographicsDir, "authorGender.pickle"))
eduLevelDf = pd.read_pickle(os.path.join(demographicsDir, "authorEducationLevel.pickle"))
eduLevelDf.rename(columns = {"ari_grade": "edu_level"}, inplace = True)
# Check
print("Age df Shape:", ageDf.shape)
print("Gender df Shape:", genderDf.shape)
print("Education Level df Shape:", eduLevelDf.shape)

In [None]:
# Check if the columns exist
if set(["age_group", "gender", "edu_level"]).issubset(set(tweetsDf.columns)):
  # drop & update
  tweetsDf.drop(["age_group", "gender", "edu_level"], axis = 1, inplace = True)
  # Merge the demographics data with the tweetsDf
  tweetsDf = tweetsDf.merge(ageDf[["author_id", "age_group"]], on = ["author_id"], how = "left")
  tweetsDf = tweetsDf.merge(genderDf[["author_id", "gender"]], on = ["author_id"], how = "left")
  tweetsDf = tweetsDf.merge(eduLevelDf[["author_id", "edu_level"]], on = ["author_id"], how = "left")
else: # Update
  # Merge the demographics data with the tweetsDf
  tweetsDf = tweetsDf.merge(ageDf[["author_id", "age_group"]], on = ["author_id"], how = "left")
  tweetsDf = tweetsDf.merge(genderDf[["author_id", "gender"]], on = ["author_id"], how = "left")
  tweetsDf = tweetsDf.merge(eduLevelDf[["author_id", "edu_level"]], on = ["author_id"], how = "left")
# Check
print("Shape after adding the demographics", tweetsDf.shape)

### TensiStrenght Score
- Run the TensiStrenght Pipeline on timeline tweets with text
- Make sure the TensiStrength pipleline generated the files successfully before running this part

In [None]:
# Read the tensiStrenght files (Depression)
depressionWithTensiDf = pd.read_pickle(os.path.join(tensiStrenghtDir, "depressionFullWithTensiScore.pickle"))
print("Shape:", depressionWithTensiDf.shape)
# Select the required columns only
depressionWithTensiDf = depressionWithTensiDf[["id", "author_id", "relax_score_org", "stress_score_org", "relax_score", "stress_score", "combined_score"]]

# Read the tensiStrenght files (Anxiety)
anxietyWithTensiDf = pd.read_pickle(os.path.join(tensiStrenghtDir, "anxietyFullWithTensiScore.pickle"))
print("Shape:", anxietyWithTensiDf.shape)
# Select the required columns only
anxietyWithTensiDf = anxietyWithTensiDf[["id", "author_id", "relax_score_org", "stress_score_org", "relax_score", "stress_score", "combined_score"]]

# Read the tensiStrenght files (PTSD)
ptsdWithTensiDf = pd.read_pickle(os.path.join(tensiStrenghtDir, "ptsdFullWithTensiScore.pickle"))
print("Shape:", ptsdWithTensiDf.shape)
# Select the required columns only
ptsdWithTensiDf = ptsdWithTensiDf[["id", "author_id", "relax_score_org", "stress_score_org", "relax_score", "stress_score", "combined_score"]]

# Combine the anxiety, depression, and PTSD TensiStrenght dataframes and remove duplicates based on id
tensiStrenghtDf = pd.concat([anxietyWithTensiDf, depressionWithTensiDf, ptsdWithTensiDf]) #, missingWithTensiDf
tensiStrenghtDf.drop_duplicates(subset = ["id"], inplace = True)
print("Tensi df Shape:", tensiStrenghtDf.shape)

# Merge with tweetsDf based on id
cols = ["id", "relax_score_org", "stress_score_org", "relax_score", "stress_score", "combined_score"]
# Check if the columns exist in the dataframe
if set(cols).issubset(set(tweetsDf.columns)):
  # drop
  tweetsDf.drop(["relax_score_org", "stress_score_org", "relax_score", "stress_score", "combined_score"], axis = 1, inplace = True)
else: # Update
  tweetsDf = tweetsDf.merge(tensiStrenghtDf[cols], on = ["id"], how = "left")
  print("Shape of tweets df after adding the tensiStrenght score", tweetsDf.shape)

#### Validate

In [None]:
# Find anxiety/depression/PTSD tweets with no score
missingTensiDf = tweetsDf[(tweetsDf.disorder.isin(["depression", "anxiety", "ptsd"])) &
(tweetsDf.combined_score.isna()) &
(tweetsDf.lang == "en") &
(tweetsDf.cleaned_text.notna())]

missingCount = missingTensiDf.shape[0]
print("Missing Tensi scores:", missingCount)

# Pickle the resulting dataframe
if missingCount > 0:
  fileName = os.path.join(tensiStrenghtDir, "missingTensiDf.pickle")
  missingTensiDf.to_pickle(fileName)
  # Report
  print("Shape of the missing Tensi scores df:", missingTensiDf.shape)
  missingTensiDf.sample()

### User/Authors Info

#### Original List

In [None]:
# Read the data
controlUsers = pd.read_csv(os.path.join(authorsDir, "authors_control.csv"))
# Add the group column
controlUsers["group"] = 0
print("Control users shape:", controlUsers.shape)
treatmentUsers = pd.read_csv(os.path.join(authorsDir, "authors_treatment.csv"))
treatmentUsers["group"] = 1
print("Treatment users shape:", treatmentUsers.shape)

# Combine
usersDf = pd.concat([controlUsers, treatmentUsers])

# Remove the unnecessary columns
usersDf.drop(["anonymized_id", "matched_author_count", "matched_author_ids", "location", "username"], axis = 1, inplace = True)

# Fix the datatypes
usersDf["created_at"] = pd.to_datetime(usersDf["created_at"]).dt.tz_localize(None)
# find the account age
usersDf["account_age"] = (pd.to_datetime("today") - usersDf["created_at"]).dt.days/365.0
# Fill na
usersDf["account_age"].fillna(0, inplace = True)
toFill = ["followers_count", "following_count", "tweet_count"]
# Fill and convert to int
usersDf[toFill] = usersDf[toFill].fillna(0)
usersDf["followers_count"] = usersDf["followers_count"].astype("int64")
usersDf["following_count"] = usersDf["following_count"].astype("int64")
usersDf["tweet_count"] = usersDf["tweet_count"].astype("int64")

# Fill the rest of columns with nothing
usersDf.fillna("", inplace = True)

# find the length of the description field
usersDf["description_len"] = usersDf["description"].apply(lambda x: len(x.split()))

# Reorganize the columns (get the disorder from the matching set)
reOrg = ['id', 'created_at', "account_age", 'verified', 'name', 'description', "description_len", 'group', 'followers_count', 'following_count', 'tweet_count']
usersDf = usersDf[reOrg]
# Rename id to author_id
usersDf.rename({"id":"author_id", "created_at": "author_since"}, axis = 1, inplace = True)
# Change the author_id data type to str
usersDf["author_id"] = usersDf["author_id"].astype("str")
# Check
print("Full dataset shape:", usersDf.shape)

#### PSM
using the output of the propensity score matching for more refined results

In [None]:
# Ingest the matching datasets
disorders = {"anxiety": anxietyStrataDir, "depression": depressionStrataDir, "ptsd": ptsdStrataDir}
matchingDfs = []
for disorder, strataDir in disorders.items():
  files = sorted(os.listdir(strataDir))
  firstFileName = files[0]
  # read the json file using json
  with open(os.path.join(strataDir, firstFileName)) as f:
    data = json.load(f)

  disorderDf = pd.DataFrame(data)
  disorderDf["stratum"] = 0

  for i, fileName in enumerate([f for f in files if f != firstFileName]):
      # read the json file using json
      with open(os.path.join(strataDir, fileName)) as f:
        data = json.load(f)
      df = pd.DataFrame(data)
      df["stratum"] = i + 1
      disorderDf = pd.concat([disorderDf, df])

  matchingDfs.append(disorderDf)

# Combine
matchingDf = pd.concat(matchingDfs)
# Rename columns
matchingDf.rename(columns = {"Author_id": "author_id", "Disorder": "disorder", "Class": "group"}, inplace = True)
matchingDf["author_id"] = matchingDf["author_id"].astype("str")

# Drop the columns which are not needed
matchingDf.drop(["Feature_Vector", "Propensity_Score"], axis = 1, inplace = True)

# Reset the index
matchingDf.reset_index(drop = True, inplace = True)

# Check
print("Resulting shape:", matchingDf.shape)
matchingDf.groupby(["disorder", "group"])["author_id"].count().reset_index().pivot(index = "disorder", columns = "group", values = "author_id")

In [None]:
# Add the stratum and the corrected disorder to the usersDf
usersDf = usersDf.merge(matchingDf[["author_id", "disorder", "stratum"]], on = ["author_id"], how = "left")
# Drop the na based on stratum
usersDf.dropna(subset = ["stratum"], inplace = True)
# Convert the stratum to int
usersDf["stratum"] = usersDf["stratum"].astype("int64")
# Check
print("Shape after adding the matching info", usersDf.shape)
usersDf.groupby(["disorder", "group"])["author_id"].count().reset_index().pivot(index = "disorder", columns = "group", values = "author_id")

In [None]:
# Add the user info to tweetsDf, along with the corrected disorder
if set(["author_since", "stratum"]).issubset(set(tweetsDf.columns)):
  # drop
  tweetsDf.drop(columns = ["author_since", "stratum"], inplace = True)
# Update
tweetsDf = tweetsDf.merge(usersDf[["author_id", "author_since", "disorder", "stratum"]], on = ["author_id"], how = "left")
# Rename the disorder column
tweetsDf.rename(columns = {"disorder_x": "orig_disorder", "disorder_y": "corrected_disorder"}, inplace = True)
# Fill the na corrected_disorder using orig_disorder
tweetsDf["corrected_disorder"] = tweetsDf["corrected_disorder"].fillna(tweetsDf["orig_disorder"])
# Rename corrected_disorder and drop orig_disorder
tweetsDf.rename(columns = {"corrected_disorder": "disorder"}, inplace = True)
tweetsDf.drop(columns = ["orig_disorder"], inplace = True)

# Check
print("Shape after adding the user info", tweetsDf.shape)
tweetsDf.sample()

##### Validate

In [None]:
# Find the users with different disorders on the usersDf and matchingDf
mismatchedDisordersDf = matchingDf.merge(usersDf[["author_id", "disorder"]], on = ["author_id"], how = "inner")
mismatchedDisordersDf = mismatchedDisordersDf[mismatchedDisordersDf.disorder_x != mismatchedDisordersDf.disorder_y]
mismatchedDisordersDf.rename(columns = {"disorder_x": "disorder_matching", "disorder_y": "disorder_original"}, inplace = True)
print("Shape:", mismatchedDisordersDf.shape)
mismatchedDisordersDf.head(1)

In [None]:
# Mismatch dataset
mismatchedDisordersDf.merge(tweetsDf[tweetsDf.author_id.isin(mismatchedDisordersDf.author_id)].groupby(["author_id"])["disorder"].agg(pd.Series.mode).reset_index().rename(columns = {"disorder": "disorder_tweets"}), on = "author_id", how = "left")

In [None]:
incorrectDisordersDf = tweetsDf.groupby(by = ["author_id"])["disorder"].nunique().reset_index().query("disorder > 1").reset_index(drop = True)
print("Shape:", incorrectDisordersDf.shape)
incorrectDisordersDf.head(1)

In [None]:
# Check if the above
tweetsDf[tweetsDf.author_id.isin(incorrectDisordersDf.author_id)].stratum.notna().sum()

In [None]:
# Check if authors in mismatchedDisordersDf are subset of incorrectDisordersDf
incorrectDisordersDf[incorrectDisordersDf.author_id.isin(mismatchedDisordersDf.author_id)].shape

#### Save users

In [None]:
# Save the users to parquet
fileName = os.path.join(authorsDir, "Authors.parquet")
usersDf.to_parquet(fileName)

In [None]:
usersDf.groupby(["disorder", "group"])["author_id"].count().reset_index().pivot(index = "disorder", columns = "group", values = "author_id")

### Final Touches

In [None]:
tweetsDf.sample()

Unnamed: 0,id,tweet_type,referenced_tweet_type,created_at,lang,author_id,text,cleaned_text,retweet_count,reply_count,...,stress_score_org,relax_score,stress_score,combined_score,age_group,gender,edu_level,author_since,disorder,stratum
12601561,1328498608019099648,timeline,replied_to,2020-11-17 00:42:05,en,24323669,@melissita1 @RawBeautyKristi It saved my best ...,it saved my best friends baby.,0,0,...,-1.0,1.0,0.0,0.375,,,,NaT,depression,


In [None]:
tweetsDf.dtypes

Unnamed: 0,0
id,int64
tweet_type,object
referenced_tweet_type,object
created_at,datetime64[ns]
lang,object
author_id,object
text,object
cleaned_text,object
retweet_count,int64
reply_count,int64


In [None]:
# Final data types fixes
tweetsDf["diagnosis_date"] = pd.to_datetime(tweetsDf["diagnosis_date"])
# Fill na on referenced_tweet_type
tweetsDf["referenced_tweet_type"] = tweetsDf["referenced_tweet_type"].fillna("original")

In [None]:
tweetsDf.columns

Index(['id', 'tweet_type', 'referenced_tweet_type', 'created_at', 'lang',
       'author_id', 'text', 'cleaned_text', 'retweet_count', 'reply_count',
       'like_count', 'quote_count', 'source', 'group', 'diagnosis_date',
       'treatment_date', 'music_id', 'relax_score_org', 'stress_score_org',
       'relax_score', 'stress_score', 'combined_score', 'age_group', 'gender',
       'edu_level', 'author_since', 'disorder', 'stratum'],
      dtype='object')

In [None]:
# Reorganize the columns
cols = ['id', 'tweet_type', 'referenced_tweet_type', 'created_at', 'lang',
       'disorder', 'author_id', 'author_since', 'text', 'cleaned_text', 'retweet_count',
       'reply_count', 'like_count', 'quote_count', 'source', 'group', 'stratum',
       'diagnosis_date', 'treatment_date', 'music_id', 'relax_score_org',
       'stress_score_org', 'relax_score', 'stress_score', 'combined_score',
       'age_group', 'gender', 'edu_level']
tweetsDf = tweetsDf[cols]

In [None]:
# Check -- output number should be equal to the PSM # of users
tweetsDf[tweetsDf.stratum.notna()].author_id.nunique()

### Save
Main and individual disorder files

In [None]:
# Save the main dataframe to parquet
fileName = os.path.join(tweetsDir, "processedTweets.parquet")
tweetsDf.to_parquet(fileName)

In [None]:
# Save the anxiety data with Tensi Score
anxietyWithTensiScoreDf = tweetsDf[(tweetsDf.disorder == "anxiety")].drop_duplicates()
fileName = os.path.join(tensiStrenghtDir, "anxietyFullWithTensiScore.pickle")
anxietyWithTensiScoreDf.to_pickle(fileName)
print("Shape:", anxietyWithTensiScoreDf.shape)

Shape: (3346488, 28)


In [None]:
# Save the depression data with Tensi Score
depressionWithTensiScoreDf = tweetsDf[(tweetsDf.disorder == "depression")].drop_duplicates()
fileName = os.path.join(tensiStrenghtDir, "depressionFullWithTensiScore.pickle")
depressionWithTensiScoreDf.to_pickle(fileName)
print("Shape:", depressionWithTensiScoreDf.shape)

Shape: (7208458, 28)


In [None]:
# Save the PTSD data with Tensi Score
ptsdWithTensiScoreDf = tweetsDf[(tweetsDf.disorder == "ptsd")].drop_duplicates()
fileName = os.path.join(tensiStrenghtDir, "ptsdFullWithTensiScore.pickle")
ptsdWithTensiScoreDf.to_pickle(fileName)
print("Shape:", ptsdWithTensiScoreDf.shape)

Shape: (2967408, 28)
