## A) Prepare Data

In [None]:
# Pyspark: Read with explicit schema (faster, less memory)
!pip install pyspark



In [None]:
import os, shutil
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [None]:
drive.mount('/content/drive')

# Target folder in Drive
drive_folder = "/content/drive/MyDrive/Datakind"

# Create folder if it doesn't exist
os.makedirs(drive_folder, exist_ok=True)

Mounted at /content/drive


In [None]:
#Put your own path ot the original dataset given to you by datakind .. you showld uploaded it to your google drive to be able to use colab
original_data_path = '/content/drive/MyDrive/Learning AI/Projects/Datakind Farmer Project/data/b0cd514b-b9cc-4972-a0c2-c91726e6d825.csv'

In [None]:
# Set paths and initialize Spark
data_path = drive_folder+'/data'
parquet_output_path = data_path+'/parquet_data'

In [None]:
#start and define the app
spark = SparkSession.builder.appName("CSV_to_Parquet").getOrCreate()

In [None]:
#Read the main csv file
df = spark.read.option("header", "true").option("inferSchema", "true").csv(original_data_path)

In [None]:
#Convert the data to pqraquet
df.write.mode("overwrite").parquet(parquet_output_path)

print("Data saved in the paraquet", parquet_output_path)

Data saved in the paraquet /content/drive/MyDrive/Datakind/data/parquet_data


## B) Split questions based on language

In [None]:
# Read all Parquet files in the folder
df = spark.read.parquet(parquet_output_path)

# Show first 5 rows to verify
df.show(5, truncate=False)

# Print schema to see column names and types
df.printSchema()

+-----------+----------------+-----------------+-----------------------------------------------------------------------------------------------------------+--------------+-----------------------------+-----------+----------------+-----------------+--------------------------------------------------------------------------------------+--------------+-----------------------------+------------------+--------------------+--------------------------+--------------------+-----------------+-----------------------------+------------------+--------------------+--------------------------+--------------------+-----------------+-----------------------------+
|question_id|question_user_id|question_language|question_content                                                                                           |question_topic|question_sent                |response_id|response_user_id|response_language|response_content                                                                      |response_top

### Remove duplicates

In [None]:
df_clean = df.dropDuplicates()   # or df.dropDuplicates(["col1", "col2"]) for specific columns

# Show or save the result
df_clean.show()

+-----------+----------------+-----------------+--------------------+--------------+--------------------+-----------+----------------+-----------------+--------------------+--------------+--------------------+------------------+--------------------+--------------------------+--------------------+-----------------+------------------------+------------------+--------------------+--------------------------+--------------------+-----------------+------------------------+
|question_id|question_user_id|question_language|    question_content|question_topic|       question_sent|response_id|response_user_id|response_language|    response_content|response_topic|       response_sent|question_user_type|question_user_status|question_user_country_code|question_user_gender|question_user_dob|question_user_created_at|response_user_type|response_user_status|response_user_country_code|response_user_gender|response_user_dob|response_user_created_at|
+-----------+----------------+-----------------+--------

In [None]:
print("Total after remove doplicates:", df_clean.count())

### Make folder for output

In [None]:
# Target folder in Drive
questions_by_language_folder = drive_folder + '/questions_by_language'

# Create folder if it doesn't exist
os.makedirs(questions_by_language_folder, exist_ok=True)

### 1- SWA language

In [None]:
# If your column uses text like "swa"
df_swa = df_clean.filter(col("question_language") == "swa")

# Path to temporary CSV folder
swa_temp_folder = "/content/swa_temp_csv"

# Coalesce into 1 file and write CSV
df_swa.coalesce(1).write.mode("overwrite").option("header", True).csv(swa_temp_folder)

# Target path in your Google Drive
swa_data = questions_by_language_folder+"/swa_data.csv"

# Move and rename the CSV
for filename in os.listdir(swa_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(swa_temp_folder, filename), swa_data)

# Remove temporary folder
shutil.rmtree(swa_temp_folder)

print(f"Swahili data saved to {swa_data}")

Swahili data saved to /content/drive/MyDrive/Datakind/questions_by_language/swa_data.csv


### 2- NYN language

In [None]:
df_nyn = df_clean.filter(col("question_language") == "nyn")

# Path to temporary CSV folder
nyn_temp_folder = "/content/nyn_temp_csv"

# Coalesce into 1 file and write CSV
df_nyn.coalesce(1).write.mode("overwrite").option("header", True).csv(nyn_temp_folder)

# Target path in your Google Drive
nyn_data = questions_by_language_folder+"/nyn_data.csv"

# Move and rename the CSV
for filename in os.listdir(nyn_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(nyn_temp_folder, filename), nyn_data)

# Remove temporary folder
shutil.rmtree(nyn_temp_folder)

print(f"Nyn data saved to {nyn_data}")

Nyn data saved to /content/drive/MyDrive/Datakind/questions_by_language/nyn_data.csv


### 3- LUG Language

In [None]:
df_lug = df.filter(col("question_language") == "lug")

# Path to temporary CSV folder
lug_temp_folder = "/content/lug_temp_csv"

# Coalesce into 1 file and write CSV
df_lug.coalesce(1).write.mode("overwrite").option("header", True).csv(lug_temp_folder)

# Target path in your Google Drive
lug_data = questions_by_language_folder+"/lug_data.csv"

# Move and rename the CSV
for filename in os.listdir(lug_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(lug_temp_folder, filename), lug_data)

# Remove temporary folder
shutil.rmtree(lug_temp_folder)

print(f"Lug data saved to {lug_data}")

Lug data saved to /content/drive/MyDrive/Datakind/questions_by_language/lug_data.csv


### 4- ENG Language

In [None]:
df_eng = df_clean.filter(col("question_language") == "eng")

# Path to temporary CSV folder
eng_temp_folder = "/content/eng_temp_csv"

# Coalesce into 1 file and write CSV
df_eng.coalesce(1).write.mode("overwrite").option("header", True).csv(eng_temp_folder)

# Target path in your Google Drive
eng_data = questions_by_language_folder+"/eng_data.csv"

# Move and rename the CSV
for filename in os.listdir(eng_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(eng_temp_folder, filename), eng_data)

# Remove temporary folder
shutil.rmtree(eng_temp_folder)

print(f"Eng data saved to {eng_data}")

Eng data saved to /content/drive/MyDrive/Datakind/questions_by_language/eng_data.csv


## C) Get top words in every language

### Make folder for output

In [None]:
# Target folder in Drive
words_by_language_folder = drive_folder + '/words_by_language'

# Create folder if it doesn't exist
os.makedirs(questions_by_language_folder, exist_ok=True)

### 1- SWA Language

In [None]:
swa_parquet_path = data_path+'/swa_data.parquet'
swa_cleaned_output_path = data_path+'/swa_cleaned_data.parquet'
swa_all_words_path = words_by_language_folder+'/swa_all_words.csv'

Read CSV

In [None]:
df = spark.read.csv(swa_data, header=True, inferSchema=True)

Convert CSV to Parquest to read the data as parquet for faster proccessing

In [None]:
df.write.mode("overwrite").parquet(swa_parquet_path)

In [None]:
#dont have words for this language yet
stopwords = []

In [None]:
# Read the DataFrame to ensure a clean state before processing
df = spark.read.parquet(swa_parquet_path)

# 0) Fix NULLs before any transformation
df = df.fillna({
    "question_content": "",
    "response_content": "",
})

# 1) Clean text
for c in ["question_content", "response_content"]:
    df = df.withColumn(c, F.regexp_replace(c, "[^A-Za-z ]", " "))
    df = df.withColumn(c, F.regexp_replace(c, "\\s+", " "))

df = df.withColumn("text_all", F.concat_ws(" ", "question_content", "response_content"))

# 2) Tokenizers
tokenizer = RegexTokenizer(inputCol="text_all", outputCol="tokens_all", pattern="\\s+")
tokenizer_q = RegexTokenizer(inputCol="question_content", outputCol="tokens_q", pattern="\\s+")
tokenizer_r = RegexTokenizer(inputCol="response_content", outputCol="tokens_r", pattern="\\s+")

df = tokenizer.transform(df)
df = tokenizer_q.transform(df)
df = tokenizer_r.transform(df)

# 3) Stopword removal
remover = StopWordsRemover(stopWords=stopwords, inputCol="tokens_all", outputCol="words_all")
remover_q = StopWordsRemover(stopWords=stopwords, inputCol="tokens_q", outputCol="words_q")
remover_r = StopWordsRemover(stopWords=stopwords, inputCol="tokens_r", outputCol="words_r")

df = remover.transform(df)
df = remover_q.transform(df)
df = remover_r.transform(df)

# 4) Remove single-letter tokens
df = df.withColumn("words_all", F.expr("filter(words_all, x -> length(x) > 1)"))
df = df.withColumn("words_q", F.expr("filter(words_q, x -> length(x) > 1)"))
df = df.withColumn("words_r", F.expr("filter(words_r, x -> length(x) > 1)"))

Save the data to a parquet after cleaning it

In [None]:
df.write.mode("overwrite").parquet(swa_cleaned_output_path)

Read the cleaned parquet data

In [None]:
df_cleaned = spark.read.parquet(swa_cleaned_output_path)
df_cleaned.show(5)
df_cleaned.printSchema()

+-----------+----------------+-----------------+--------------------+--------------+--------------------+-----------+----------------+-----------------+--------------------+--------------+--------------------+------------------+--------------------+--------------------------+--------------------+-----------------+------------------------+------------------+--------------------+--------------------------+--------------------+-----------------+------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|question_id|question_user_id|question_language|    question_content|question_topic|       question_sent|response_id|response_user_id|response_language|    response_content|response_topic|       response_sent|question_user_type|question_user_status|question_user_country_code|question_user_gender|question_user_dob|question_user_created_at|response_user_type|response_user_status|r

Count all the words

In [None]:
# -------------------------------------------
# 4. Word counts from ALL words combined (words_all)
# -------------------------------------------
swa_words_all = df_cleaned.select(F.explode("words_all").alias("word"))
swa_words_all = swa_words_all.filter(F.length("word") > 0)

swa_word_counts_all = swa_words_all.groupBy("word").count().orderBy(F.desc("count"))

Write it to a CSV file

In [None]:
# Path to temporary CSV folder
swa_words_temp_folder = "/content/swa_words_temp_csv"

# Coalesce into 1 file and write CSV
swa_word_counts_all.coalesce(1).write.mode("overwrite").option("header", True).csv(swa_words_temp_folder)

# Move and rename the CSV
for filename in os.listdir(swa_words_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(swa_words_temp_folder, filename), swa_all_words_path)

# Remove temporary folder
shutil.rmtree(swa_words_temp_folder)

print(f"Swahili words data saved to {swa_all_words_path}")

Swahili words data saved to /content/drive/MyDrive/Datakind/words_by_language/swa_all_words.csv


### 2- NYN Language

In [None]:
nyn_parquet_path = data_path+'/nyn_data.parquet'
nyn_cleaned_output_path = data_path+'/nyn_cleaned_data.parquet'
nyn_all_words_path = words_by_language_folder+'/nyn_all_words.csv'

Read CSV

In [None]:
df = spark.read.csv(nyn_data, header=True, inferSchema=True)

Convert CSV to Parquest to read the data as parquet for faster proccessing

In [None]:
df.write.mode("overwrite").parquet(nyn_parquet_path)

In [None]:
#dont have words for this language yet
stopwords = []

In [None]:
# Read the DataFrame to ensure a clean state before processing
df = spark.read.parquet(nyn_parquet_path)

# 0) Fix NULLs before any transformation
df = df.fillna({
    "question_content": "",
    "response_content": "",
})

# 1) Clean text
for c in ["question_content", "response_content"]:
    df = df.withColumn(c, F.regexp_replace(c, "[^A-Za-z ]", " "))
    df = df.withColumn(c, F.regexp_replace(c, "\\s+", " "))

df = df.withColumn("text_all", F.concat_ws(" ", "question_content", "response_content"))

# 2) Tokenizers
tokenizer = RegexTokenizer(inputCol="text_all", outputCol="tokens_all", pattern="\\s+")
tokenizer_q = RegexTokenizer(inputCol="question_content", outputCol="tokens_q", pattern="\\s+")
tokenizer_r = RegexTokenizer(inputCol="response_content", outputCol="tokens_r", pattern="\\s+")

df = tokenizer.transform(df)
df = tokenizer_q.transform(df)
df = tokenizer_r.transform(df)

# 3) Stopword removal
remover = StopWordsRemover(stopWords=stopwords, inputCol="tokens_all", outputCol="words_all")
remover_q = StopWordsRemover(stopWords=stopwords, inputCol="tokens_q", outputCol="words_q")
remover_r = StopWordsRemover(stopWords=stopwords, inputCol="tokens_r", outputCol="words_r")

df = remover.transform(df)
df = remover_q.transform(df)
df = remover_r.transform(df)

# 4) Remove single-letter tokens
df = df.withColumn("words_all", F.expr("filter(words_all, x -> length(x) > 1)"))
df = df.withColumn("words_q", F.expr("filter(words_q, x -> length(x) > 1)"))
df = df.withColumn("words_r", F.expr("filter(words_r, x -> length(x) > 1)"))

Save the data to a parquet after cleaning it

In [None]:
df.write.mode("overwrite").parquet(nyn_cleaned_output_path)

Read the cleaned parquet data

In [None]:
df_cleaned = spark.read.parquet(nyn_cleaned_output_path)
df_cleaned.show(5)
df_cleaned.printSchema()

Count all the words

In [None]:
# -------------------------------------------
# 4. Word counts from ALL words combined (words_all)
# -------------------------------------------
nyn_words_all = df_cleaned.select(F.explode("words_all").alias("word"))
nyn_words_all = nyn_words_all.filter(F.length("word") > 0)

nyn_word_counts_all = nyn_words_all.groupBy("word").count().orderBy(F.desc("count"))


Write it to a CSV file

In [None]:
# Path to temporary CSV folder
nyn_words_temp_folder = "/content/nyn_words_temp_csv"

# Coalesce into 1 file and write CSV
nyn_word_counts_all.coalesce(1).write.mode("overwrite").option("header", True).csv(nyn_words_temp_folder)

# Move and rename the CSV
for filename in os.listdir(nyn_words_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(nyn_words_temp_folder, filename), nyn_all_words_path)

# Remove temporary folder
shutil.rmtree(nyn_words_temp_folder)

print(f"Nyn words data saved to {nyn_all_words_path}")

### 3- LUG Language

In [None]:
lug_parquet_path = data_path+'/lug_data.parquet'
lug_cleaned_output_path = data_path+'/lug_cleaned_data.parquet'
lug_all_words_path = words_by_language_folder+'/lug_all_words.csv'

Read CSV

In [None]:
df = spark.read.csv(lug_data, header=True, inferSchema=True)

Convert CSV to Parquest to read the data as parquet for faster proccessing

In [None]:
df.write.mode("overwrite").parquet(lug_parquet_path)

In [None]:
#dont have words for this language yet
stopwords = []

In [None]:
# Read the DataFrame to ensure a clean state before processing
df = spark.read.parquet(lug_parquet_path)

# 0) Fix NULLs before any transformation
df = df.fillna({
    "question_content": "",
    "response_content": "",
})

# 1) Clean text
for c in ["question_content", "response_content"]:
    df = df.withColumn(c, F.regexp_replace(c, "[^A-Za-z ]", " "))
    df = df.withColumn(c, F.regexp_replace(c, "\\s+", " "))

df = df.withColumn("text_all", F.concat_ws(" ", "question_content", "response_content"))

# 2) Tokenizers
tokenizer = RegexTokenizer(inputCol="text_all", outputCol="tokens_all", pattern="\\s+")
tokenizer_q = RegexTokenizer(inputCol="question_content", outputCol="tokens_q", pattern="\\s+")
tokenizer_r = RegexTokenizer(inputCol="response_content", outputCol="tokens_r", pattern="\\s+")

df = tokenizer.transform(df)
df = tokenizer_q.transform(df)
df = tokenizer_r.transform(df)

# 3) Stopword removal
remover = StopWordsRemover(stopWords=stopwords, inputCol="tokens_all", outputCol="words_all")
remover_q = StopWordsRemover(stopWords=stopwords, inputCol="tokens_q", outputCol="words_q")
remover_r = StopWordsRemover(stopWords=stopwords, inputCol="tokens_r", outputCol="words_r")

df = remover.transform(df)
df = remover_q.transform(df)
df = remover_r.transform(df)

# 4) Remove single-letter tokens
df = df.withColumn("words_all", F.expr("filter(words_all, x -> length(x) > 1)"))
df = df.withColumn("words_q", F.expr("filter(words_q, x -> length(x) > 1)"))
df = df.withColumn("words_r", F.expr("filter(words_r, x -> length(x) > 1)"))

Save the data to a parquet after cleaning it

In [None]:
df.write.mode("overwrite").parquet(lug_cleaned_output_path)

Read the cleaned parquet data

In [None]:
df_cleaned = spark.read.parquet(lug_cleaned_output_path)
df_cleaned.show(5)
df_cleaned.printSchema()

Count all the words

In [None]:
# -------------------------------------------
# 4. Word counts from ALL words combined (words_all)
# -------------------------------------------
lug_words_all = df_cleaned.select(F.explode("words_all").alias("word"))
lug_words_all = lug_words_all.filter(F.length("word") > 0)

lug_word_counts_all = lug_words_all.groupBy("word").count().orderBy(F.desc("count"))


Write it to a CSV file

In [None]:
# Path to temporary CSV folder
lug_words_temp_folder = "/content/lug_words_temp_csv"

# Coalesce into 1 file and write CSV
lug_word_counts_all.coalesce(1).write.mode("overwrite").option("header", True).csv(lug_words_temp_folder)

# Move and rename the CSV
for filename in os.listdir(lug_words_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(lug_words_temp_folder, filename), lug_all_words_path)

# Remove temporary folder
shutil.rmtree(lug_words_temp_folder)

print(f"Lug words data saved to {lug_all_words_path}")

### 4- ENG Language

In [None]:
eng_parquet_path = data_path+'/eng_data.parquet'
eng_cleaned_output_path = data_path+'/eng_cleaned_data.parquet'
eng_all_words_path = words_by_language_folder+'/eng_all_words.csv'

Read CSV

In [None]:
df = spark.read.csv(eng_data, header=True, inferSchema=True)

Convert CSV to Parquest to read the data as parquet for faster proccessing

In [None]:
df.write.mode("overwrite").parquet(eng_parquet_path)

In [None]:
stopwords = [
    # Articles & Determiners
    "omusolo", "eriringi", "bwe", "mu", "ki", "e", "n", "ne", "gwe", "ye",
    "a", "an", "the",

    # Common Prepositions
    "mu", "ku", "ko", "wa", "kwa", "ng'", "n'", "pa", "lu", "eri",
    "from", "in", "on", "at", "to", "by", "with", "through", "into",

    # Pronouns (Personal, Possessive, Reflexive)
    "nze", "yo", "gwe", "ye", "ffe", "mmwe", "bo",  # Personal
    "nino", "yino", "giino", "yino", "ffino", "mmwino", "bwino",  # Possessive
    "kyeyano", "weyano", "gyeyano", "yeyano",  # Reflexive
    "i", "me", "you", "he", "she", "it", "we", "they",
    "my", "your", "his", "her", "its", "our", "their",

    # Conjunctions & Connectors
    "naye", "ate", "era", "nebi", "wabula", "bwati", "omusolo",
    "and", "but", "or", "because", "so", "if", "when", "while",
    "therefore", "however", "moreover", "furthermore",

    # Be/Have/Do Verbs
    "kuba", "kufuna", "kukola", "kugeza", "kuva", "kyo", "kiri", "kali",
    "be", "is", "are", "was", "been", "have", "has", "do", "does",

    # Auxiliary Verbs & Modals
    "kyali", "kyakuba", "kyali", "kigeze", "kidde",
    "kyokka", "kigendereza", "kyeyinza",
    "can", "could", "will", "would", "shall", "should", "may", "might", "must",

    # Question Words (Wh-words)
    "ki", "ani", "lwaki", "luti", "katika", "buyinza",
    "what", "who", "which", "where", "when", "why", "how",

    # Demonstratives
    "guno", "oguno", "buno", "gwa", "ewa",
    "this", "that", "these", "those", "here", "there",

    # Quantifiers & Numbers (low-frequency content)
    "emu", "bbiri", "ssatu", "nnya", "mufu",
    "one", "two", "three", "some", "all", "each", "every", "few", "many", "most",

    # Negative Particles
    "te", "nte", "tewali", "tekitegeze", "tobulamu",
    "not", "no", "none", "nothing", "nowhere", "never",

    # Time-related (common fillers)
    "kati", "oluyimba", "bukedde", "we", "olunaku",
    "now", "then", "today", "tomorrow", "yesterday", "always", "never", "often",

    # Common Verbs (semantically weak)
    "kola", "koleddwa", "koze", "kolesa",  # Do/make
    "tuuka", "tuyita", "tuuse",  # Come/come back
    "genda", "gwe", "guze", "genseza",  # Go
    "laba", "labeddwa", "laze", "labisa",  # Look/see
    "tegeza", "tegeddwa", "tegeze",  # Understand
    "kuba", "kyali", "kyakuba",  # Be
    "funa", "funeddwa", "fuze", "funisa",  # Want
    "saba", "sabeddwa", "sabize",  # Ask
    "sabe", "sabededdwa", "sabire",  # Know
    "famba", "fambeddwa", "fambiza",  # Move
    "sika", "sikeddwa", "sikira",  # Stay
    "kira", "kireddwa", "kirizze",  # Die
    "sebeza", "sebezeeddwa", "sebezeddwa",  # Work

    # Generic Nouns (low content value)
    "kintu", "ekintu", "bintu", "emyaka", "omuntu", "abantu",
    "thing", "stuff", "person", "people", "way", "place", "time", "year",

    # Common Adjectives/Adverbs (filler words)
    "nkulu", "nnene", "ntono", "nnuma",
    "big", "small", "good", "bad", "more", "less", "very",

    # Degree & Intensifiers
    "kitone", "kalilema", "kaseera",
    "very", "too", "quite", "rather", "just", "only",

    # Additional Luganda-specific stops
    "eyaffe", "eyagwe", "kyewakyo",
    "okugera", "okukoma", "okutegeeza",
    "babiri", "bakopi", "bakaliwe",
    "bubiri", "bukopi", "bukalibw",
    "kale", "kati", "kabukabu",
    "nkasa", "manafu", "fulikifu",
    "banakubala", "mannya", "kuwunya",
    "kigazi", "siguzi", "namugosa",
    "njawulo", "kalaka", "namweli",
    "eyongeza", "eyanguyiza", "kyeyajja",
    "alirimu", "alibawo", "alimuva",

    # English-Luganda mixed common usage
    "yes", "no", "okay", "okay", "okay",
    "please", "thank", "thanks", "sorry",
    "mister", "miss", "sir", "madam",

    # More generic usage patterns
    "kubadde", "kitaasa", "kisaasa",
    "kwalubadde", "kwalubiri", "kwabiri",
    "nyingiliza", "yingiliza", "engineeza",
]

In [None]:
# Read the DataFrame to ensure a clean state before processing
df = spark.read.parquet(eng_parquet_path)

# 0) Fix NULLs before any transformation
df = df.fillna({
    "question_content": "",
    "response_content": "",
})

# 1) Clean text
for c in ["question_content", "response_content"]:
    df = df.withColumn(c, F.regexp_replace(c, "[^A-Za-z ]", " "))
    df = df.withColumn(c, F.regexp_replace(c, "\\s+", " "))

df = df.withColumn("text_all", F.concat_ws(" ", "question_content", "response_content"))

# 2) Tokenizers
tokenizer = RegexTokenizer(inputCol="text_all", outputCol="tokens_all", pattern="\\s+")
tokenizer_q = RegexTokenizer(inputCol="question_content", outputCol="tokens_q", pattern="\\s+")
tokenizer_r = RegexTokenizer(inputCol="response_content", outputCol="tokens_r", pattern="\\s+")

df = tokenizer.transform(df)
df = tokenizer_q.transform(df)
df = tokenizer_r.transform(df)

# 3) Stopword removal
remover = StopWordsRemover(stopWords=stopwords, inputCol="tokens_all", outputCol="words_all")
remover_q = StopWordsRemover(stopWords=stopwords, inputCol="tokens_q", outputCol="words_q")
remover_r = StopWordsRemover(stopWords=stopwords, inputCol="tokens_r", outputCol="words_r")

df = remover.transform(df)
df = remover_q.transform(df)
df = remover_r.transform(df)

# 4) Remove single-letter tokens
df = df.withColumn("words_all", F.expr("filter(words_all, x -> length(x) > 1)"))
df = df.withColumn("words_q", F.expr("filter(words_q, x -> length(x) > 1)"))
df = df.withColumn("words_r", F.expr("filter(words_r, x -> length(x) > 1)"))

Save the data to a parquet after cleaning it

In [None]:
df.write.mode("overwrite").parquet(eng_cleaned_output_path)

Read the cleaned parquet data

In [None]:
df_cleaned = spark.read.parquet(eng_cleaned_output_path)
df_cleaned.show(5)
df_cleaned.printSchema()

Count all the words

In [None]:
# -------------------------------------------
# 4. Word counts from ALL words combined (words_all)
# -------------------------------------------
eng_words_all = df_cleaned.select(F.explode("words_all").alias("word"))
eng_words_all = eng_words_all.filter(F.length("word") > 0)

eng_word_counts_all = eng_words_all.groupBy("word").count().orderBy(F.desc("count"))


Write it to a CSV file

In [None]:
eng_word_counts_all.coalesce(1).write.mode("overwrite").option("header", True).csv(eng_all_words_path)
# Path to temporary CSV folder
eng_words_temp_folder = "/content/lug_words_temp_csv"

# Coalesce into 1 file and write CSV
eng_word_counts_all.coalesce(1).write.mode("overwrite").option("header", True).csv(eng_words_temp_folder)

# Move and rename the CSV
for filename in os.listdir(eng_words_temp_folder):
    if filename.endswith(".csv"):
        shutil.move(os.path.join(eng_words_temp_folder, filename), eng_all_words_path)

# Remove temporary folder
shutil.rmtree(eng_words_temp_folder)

print(f"Eng words data saved to {eng_all_words_path}")

## D) Visualization

**First of all import the python libraties**

In [None]:
!pip install pyspark



**You need to have access to the file in google drive firstly**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**Load the data to spark**
If the data shared with you then you need to create a shortcut in your github account access it by the follwoing link

In [None]:
path = '/content/drive/MyDrive/ Projects /Datakind/data by languages/eng_data.csv'
extract_path = "/content/data_parquet"
# "/content/data_parquet"  folder to extract files, is is not importat to be exist, because it will be created automaticaly

**Create a Spark session to do all spark word**

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("FarmerQANDA") \
    .getOrCreate()


**Load the data in df by spark function**

In [None]:
df = spark.read.csv(path, header=True, inferSchema=True)

**Prequet path**
This is a path in your google drive to write the paraquet data, its not important to have this path before, it will be created auto.

In [None]:
parquet_path = '/content/drive/MyDrive/Projects/Datakind/prequet_data/eng_data.parquet'

**Just to explore your data**

In [None]:
df.printSchema()
df.show(5)


root
 |-- question_id: integer (nullable = true)
 |-- question_user_id: integer (nullable = true)
 |-- question_language: string (nullable = true)
 |-- question_content: string (nullable = true)
 |-- question_topic: string (nullable = true)
 |-- question_sent: string (nullable = true)
 |-- response_id: string (nullable = true)
 |-- response_user_id: string (nullable = true)
 |-- response_language: string (nullable = true)
 |-- response_content: string (nullable = true)
 |-- response_topic: string (nullable = true)
 |-- response_sent: string (nullable = true)
 |-- question_user_type: string (nullable = true)
 |-- question_user_status: string (nullable = true)
 |-- question_user_country_code: string (nullable = true)
 |-- question_user_gender: string (nullable = true)
 |-- question_user_dob: string (nullable = true)
 |-- question_user_created_at: string (nullable = true)
 |-- response_user_type: string (nullable = true)
 |-- response_user_status: string (nullable = true)
 |-- response_us

In [None]:
df.write.mode("overwrite").parquet(parquet_path)

Read the data

In [None]:
df = spark.read.parquet(parquet_path)

# If CSV instead:
# df = spark.read.csv('/content/drive/MyDrive/your_path/eng_data.csv', header=True, inferSchema=True)


In [None]:
from pyspark.sql import functions as F

df = df.withColumn(
    "question_datetime",
    F.to_timestamp("question_sent")      # Convert string â†’ timestamp
)


Check

In [None]:
df.select("question_sent", "question_datetime").show(5, truncate=False)


+-----------------------------+--------------------------+
|question_sent                |question_datetime         |
+-----------------------------+--------------------------+
|2020-09-06 18:50:06.723522+00|2020-09-06 18:50:06.723522|
|2020-09-07 07:36:47.580758+00|2020-09-07 07:36:47.580758|
|2020-09-07 08:20:49.551799+00|2020-09-07 08:20:49.551799|
|2020-09-07 08:52:30.552013+00|2020-09-07 08:52:30.552013|
|2020-09-07 10:24:23.681822+00|2020-09-07 10:24:23.681822|
+-----------------------------+--------------------------+
only showing top 5 rows



Seasons


In [None]:
df = df.withColumn("month", F.month("question_datetime"))

df = df.withColumn(
    "season",
    F.when(F.col("month").isin(12,1,2), "Winter")
     .when(F.col("month").isin(3,4,5), "Spring")
     .when(F.col("month").isin(6,7,8), "Summer")
     .otherwise("Fall")
)


**Financial key words**
We read all the words from the questions and then extract only words related to finanical





In [None]:
financial_keywords = [
    "bank","money","price","loan","credit","savings","insurance",
    "investment","payment","transaction","finance","financial",
    "budget","debt","interest","capital","wealth","income","revenue",
    "profit","loss","cost","value","market","stock","bond","asset",
    "liability","equity","currency","cash","atm","mobile money",
    "microfinance","blockchain","cryptocurrency","crowdfunding",
    "p2p","remittance","transfer","wire transfer"
]




**Crop key words**
We read all the words from the questions and then extract only words related to crop




In [None]:
crop_keywords = [
    "alfalfa", "apples", "bananas", "barley", "beans",
    "beets", "berries", "cabbage", "canola", "carrots",
    "chickpeas", "citrus", "clover", "coffee", "corn",
    "cotton", "cucumber", "flax", "fruits", "grapes",
    "jute", "lentils", "lettuce", "maize", "millet",
    "oats", "peanut", "peas", "potatoes", "rapeseed",
    "rice", "rye", "sorghum", "soybean", "spinach",
    "sugarcane", "sunflower", "tea", "tobacco", "tomato",
    "vegetables", "wheat", "yams", "rubber"
]


In [None]:
bc_fin_keywords = spark.sparkContext.broadcast(financial_keywords)


**Detect finanical keyword**

In [None]:
from pyspark.sql.types import ArrayType, StringType


In [None]:
def classify_financial(text):
    if text is None:
        return []
    text = text.lower()

    found = []
    for kw in bc_fin_keywords.value:
        if kw.lower() in text:
            found.append(kw)
    return found

fin_udf = F.udf(classify_financial, ArrayType(StringType()))

df = df.withColumn(
    "financial_keywords",
    fin_udf(F.concat_ws(" ", "question_content", "response_content"))
)



**Create a binary financial-inclusion label**

In [None]:
df = df.withColumn(
    "is_financial",
    F.when(F.size("financial_keywords") > 0, 1).otherwise(0)
)


**Proportion of farmer questions about financial inclusion**

In [None]:
df_fin_prop = df.agg(
    F.sum("is_financial").alias("financial_questions"),
    F.count("*").alias("total_questions")
).withColumn(
    "financial_proportion",
    F.col("financial_questions") / F.col("total_questions")
)

df_fin_prop.show()


+-------------------+---------------+--------------------+
|financial_questions|total_questions|financial_proportion|
+-------------------+---------------+--------------------+
|            1356238|       11779678| 0.11513370738996431|
+-------------------+---------------+--------------------+



**Seasonal trends for financial questions**

In [None]:
df_fin_season = (
    df.groupBy("season")
      .agg(F.sum("is_financial").alias("financial_questions"))
      .orderBy("season")
)

df_fin_season.show()


+------+-------------------+
|season|financial_questions|
+------+-------------------+
|  Fall|             433479|
|Spring|             271622|
|Summer|             355040|
|Winter|             296097|
+------+-------------------+



**Monthly**

In [None]:
df_fin_month = (
    df.groupBy("month")
      .agg(F.sum("is_financial").alias("financial_questions"))
      .orderBy("month")
)

**Regonal**

In [None]:
df_fin_region = (
    df.groupBy("question_user_country_code")
      .agg(F.sum("is_financial").alias("financial_questions"))
      .orderBy(F.desc("financial_questions"))
)


In [None]:
df_fin_region

DataFrame[question_user_country_code: string, financial_questions: bigint]

**Expressing challenges: extract financial problem phrases**

In [None]:
problem_words = financial_keywords

bc_problem_words = spark.sparkContext.broadcast(problem_words)

def extract_issues(text):
    if text is None:
        return ""
    text = text.lower()
    if any(p in text for p in bc_problem_words.value) and \
       any(f in text for f in bc_fin_keywords.value):
        return text
    return ""

issue_udf = F.udf(extract_issues, StringType())

df = df.withColumn("financial_issue_text", issue_udf("question_content"))


**Key words to work with**

In [None]:
bc_fin_keywords = spark.sparkContext.broadcast(financial_keywords)

from pyspark.sql.types import ArrayType, StringType

def classify_financial(text):
    if text is None:
        return []
    text = text.lower()

    found = []
    for kw in bc_fin_keywords.value:
        if kw.lower() in text:
            found.append(kw)
    return found

fin_udf = F.udf(classify_financial, ArrayType(StringType()))


**Apply UDF**

In [None]:
df = df.withColumn(
    "financial_keywords",
    fin_udf(F.concat_ws(" ", "question_content", "response_content"))
)


In [None]:
df_exploded = df.withColumn("keyword", F.explode("financial_keywords"))


In [None]:
df_monthly = (
    df_exploded.groupBy("month", "season", "keyword")
               .agg(F.count("*").alias("count"))
               .orderBy("month", "keyword")
)


In [None]:
# Install Plotly if needed
!pip install plotly

import plotly.express as px
import pandas as pd

# Convert Spark DataFrame to Pandas for plotting
df_plot = df_monthly.toPandas()

# Fix month values
df_plot["month"] = pd.to_numeric(df_plot["month"], errors="coerce")
df_plot = df_plot.dropna(subset=["month"])
df_plot["month"] = df_plot["month"].astype(int)

# Sort by month
df_plot = df_plot.sort_values("month")

# Interactive line chart: keyword trends across months
fig = px.line(
    df_plot,
    x="month",
    y="count",
    color="keyword",
    line_group="keyword",
    hover_data=["season"],
    markers=True,
    title="Keyword Trends by Month (Multi-Year Normalized to One Year)"
)

fig.update_layout(
    xaxis=dict(dtick=1, title="Month"),
    yaxis=dict(title="Number of Questions / Answers"),
    legend_title="Keyword",
    template="plotly_white"
)

fig.show()




**For Corps**


In [None]:
bc_crop_keywords = spark.sparkContext.broadcast(crop_keywords)


In [None]:
from pyspark.sql.types import ArrayType, StringType

def classify_crops(text):
    if text is None:
        return []
    text = text.lower()

    found = []
    for kw in bc_crop_keywords.value:
        if kw.lower() in text:
            found.append(kw)
    return found

crop_udf = F.udf(classify_crops, ArrayType(StringType()))


In [None]:
df = df.withColumn(
    "crop_keywords",
    crop_udf(F.concat_ws(" ", "question_content", "response_content"))
)


In [None]:
df_crop_exploded = df.withColumn("crop_keyword", F.explode("crop_keywords"))


In [None]:
df_crop_monthly = (
    df_crop_exploded.groupBy("month", "season", "crop_keyword")
                    .agg(F.count("*").alias("count"))
                    .orderBy("month", "crop_keyword")
)


In [None]:
# Convert Spark DF -> Pandas
df_crop_plot = df_crop_monthly.toPandas()

# Clean month
df_crop_plot["month"] = pd.to_numeric(df_crop_plot["month"], errors="coerce")
df_crop_plot = df_crop_plot.dropna(subset=["month"])
df_crop_plot["month"] = df_crop_plot["month"].astype(int)

df_crop_plot = df_crop_plot.sort_values("month")

# Plot
fig = px.line(
    df_crop_plot,
    x="month",
    y="count",
    color="crop_keyword",
    line_group="crop_keyword",
    hover_data=["season"],
    markers=True,
    title="Crop Keyword Trends by Month (Multi-Year Normalized to One Year)"
)

fig.update_layout(
    xaxis=dict(dtick=1, title="Month"),
    yaxis=dict(title="Number of Questions / Answers"),
    legend_title="Crop Keyword",
    template="plotly_white"
)

fig.show()


## E) Conculation

### Results
Now we have the top used words by language next will be to transate them and classify each word that relate to financial or crop vs general topics to give the results to challenge 4 and 5