In [1]:
!pip install wordsegment
!pip install pyspark
!pip install dask

Collecting wordsegment
  Downloading wordsegment-1.3.1-py2.py3-none-any.whl.metadata (7.7 kB)
Downloading wordsegment-1.3.1-py2.py3-none-any.whl (4.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.8/4.8 MB[0m [31m38.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: wordsegment
Successfully installed wordsegment-1.3.1


In [2]:
from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification
import torch
import pandas as pd
import re
import wordsegment
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import numpy as np
from multiprocessing import Pool
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, MapType, StringType, FloatType
from transformers import AutoTokenizer, AutoModelForSequenceClassification

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
spark = SparkSession.builder.config("spark.executor.memory", "8g").config("spark.driver.memory", "8g").getOrCreate()

## Morality analysis

In [5]:
df = pd.read_json("/content/drive/MyDrive/Lu&Marina/data/politeness_results_normalized_shrt.json")

In [6]:
df['internal_id'] = range(len(df))

In [7]:
df.head()

Unnamed: 0,id,from,value,norm_Hedges,norm_Swearing,norm_Reassurance,norm_Please,norm_Gratitude,norm_Apology,norm_Affirmation,internal_id
0,QWJhYvA,human,Summarize the main ideas of Jeff Walker's Prod...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0
1,QWJhYvA,gpt,Here are the main ideas of Jeff Walker's Produ...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1
2,QWJhYvA,human,Summarize the main ideas of Brendon Burchard's...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2
3,QWJhYvA,gpt,Here are the main ideas of Brendon Burchard's ...,0.0077,0.0,0.0,0.0,0.0,0.0,0.0,3
4,QWJhYvA,human,What are the mental triggers in Jeff Walker's ...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4


In [8]:
df.shape[0]/6

100499.33333333333

In [9]:
num_chunks = 6
chunk_size = len(df) // num_chunks

# Create chunks as DataFrames
dfs = []

for i in range(num_chunks):
    start_row = i * chunk_size
    end_row = (i + 1) * chunk_size if i < num_chunks - 1 else len(df)  # Ensure the last chunk includes all rows
    dfs.append(df.iloc[start_row:end_row])

# Unpack the chunks into separate variables
df1, df2, df3, df4, df5, df6 = dfs

In [62]:
df_spark = spark.createDataFrame(df6)

In [63]:
# Foundations and model base
FOUNDATIONS = ["care", "fairness", "loyalty", "authority", "sanctity"]
MODEL_BASE = "joshnguyen/mformer-"

# Broadcast model base and foundations
tokenizer = AutoTokenizer.from_pretrained(MODEL_BASE + FOUNDATIONS[0])

In [64]:
# Preprocess text function
def preprocess_text(text):
    """Preprocess text by removing URLs, special characters, and extra whitespace."""
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)  # Remove URLs
    text = re.sub(r'[^a-zA-Z\s]', '', text)  # Remove special characters
    text = re.sub(r'[\n\r\t]+', ' ', text)  # Replace newlines, tabs, etc. with spaces
    text = re.sub(r'\s+', ' ', text).strip()  # Remove extra spaces
    return text

In [65]:
preprocess_udf = udf(preprocess_text)

In [66]:
df_spark = df_spark.withColumn("processed_text", preprocess_udf(col("value")))

In [67]:
# Cache to store loaded models
MODEL_CACHE = {}

def get_model_and_tokenizer(foundation):
    """Load model and tokenizer once and cache them."""
    if foundation not in MODEL_CACHE:
        print(f"Loading model for foundation: {foundation}")
        model_path = MODEL_BASE + foundation
        tokenizer = AutoTokenizer.from_pretrained(model_path)
        model = AutoModelForSequenceClassification.from_pretrained(model_path).eval()
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        MODEL_CACHE[foundation] = (tokenizer, model)
    return MODEL_CACHE[foundation]

In [68]:
# Function to classify text
def classify_text_partition_optimized(partition, foundation):
    """Classify text in partition for a specific foundation using cached model."""
    tokenizer, model = get_model_and_tokenizer(foundation)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    for row in partition:
        text = row['processed_text']
        if not text:  # Skip empty text
            yield (row['internal_id'], 0.0)
            continue

        # Tokenize and predict
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
            prob = torch.softmax(outputs.logits, dim=1)[:, 1].item()
        yield (row['internal_id'], prob)

In [69]:
df_spark = df_spark.withColumn("processed_text", preprocess_udf(col("value")))

In [70]:
import os

output_dir = "/content/drive/MyDrive/Lu&Marina/data/processed6"
os.makedirs(output_dir, exist_ok=True)

In [71]:
from pyspark.sql.types import StructType, StructField, LongType

for foundation in FOUNDATIONS:
    print(f"Processing foundation: {foundation}...")

    # Map partitions with optimized classification
    results_rdd = df_spark.rdd.mapPartitions(lambda partition: classify_text_partition_optimized(partition, foundation))

    # Convert results to DataFrame
    results_schema = StructType([
        StructField("internal_id", LongType(), True),
        StructField(f"{foundation}_prob", FloatType(), True)
    ])
    foundation_df = spark.createDataFrame(results_rdd, schema=results_schema)

    # Save to Parquet incrementally
    output_path = os.path.join(output_dir, f"{foundation}_probabilities.parquet")
    print(f"Saving results for foundation '{foundation}' to {output_path}...")
    foundation_df.write.mode("overwrite").parquet(output_path)

    print(f"Completed processing and saving for foundation: {foundation}.")

print("All foundations processed successfully!")

Processing foundation: care...
Saving results for foundation 'care' to /content/drive/MyDrive/Lu&Marina/data/processed6/care_probabilities.parquet...
Completed processing and saving for foundation: care.
Processing foundation: fairness...
Saving results for foundation 'fairness' to /content/drive/MyDrive/Lu&Marina/data/processed6/fairness_probabilities.parquet...
Completed processing and saving for foundation: fairness.
Processing foundation: loyalty...
Saving results for foundation 'loyalty' to /content/drive/MyDrive/Lu&Marina/data/processed6/loyalty_probabilities.parquet...
Completed processing and saving for foundation: loyalty.
Processing foundation: authority...
Saving results for foundation 'authority' to /content/drive/MyDrive/Lu&Marina/data/processed6/authority_probabilities.parquet...
Completed processing and saving for foundation: authority.
Processing foundation: sanctity...
Saving results for foundation 'sanctity' to /content/drive/MyDrive/Lu&Marina/data/processed6/sanctity