In [23]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import regexp_replace, lower, col, udf
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.functions import concat_ws


In [24]:
# fetch from local path otherwise fetch from the remote path (if remote kernel is being used)
try: 
    sys.path.append('../../scripts')
    import database
    import utils
    import spark
except:
    try:
        sys.path.append('./scripts')
        import database
        import utils
        import spark
    except:
        raise RuntimeError('Failed to import from both local and remote paths. Program terminated.')


In [25]:
spark = spark.setup_spark_session(app_name='Preprocessingpy')

In [21]:
# Define preprocessing pipeline
def preprocess_data(input_path, output_path):
    # Define schema
    schema = StructType([
        StructField('target', IntegerType(), True),
        StructField('text', StringType(), True),
    ])
    
    # Read data
    df = spark.read.csv(input_path, header=False, schema=schema, sep="\t")
    df = df.na.drop()
    
    # Convert text to lowercase
    df = df.withColumn('text', lower(col('text')))
    
    # Remove special characters, URLs, and mentions
    df = df.withColumn('text', regexp_replace(col('text'), 'http\S+|www.\S+|@\S+', ''))
    
    # Tokenize text
    tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
    
    # Remove stop words
    remover = StopWordsRemover(inputCol='tokens', outputCol='filtered_tokens')
    
    # Define preprocessing pipeline
    pipeline = Pipeline(stages=[tokenizer, remover])
    
    # Fit pipeline
    pipeline_model = pipeline.fit(df)
    
    # Transform data
    df_preprocessed = pipeline_model.transform(df)
    
    # Convert array of tokens to single string
    array_to_string_udf = udf(lambda x: ' '.join(x), StringType())
    df_preprocessed = df_preprocessed.withColumn('preprocessed_text', array_to_string_udf('filtered_tokens'))
    
    # Save preprocessed data
    df_preprocessed.select('preprocessed_text').write.csv(output_path, mode='overwrite', header=False, sep="\t")
    
    return df_preprocessed



DataFrame[target: int, text: string, tokens: array<string>, filtered_tokens: array<string>, preprocessed_text: string]

In [22]:
# Preprocess the data
input_path = 'hdfs://localhost:54310/user/datascience/data/processed/tweets_subset'
output_path = 'hdfs://localhost:54310/user/datascience/data/preprocessed/tweets_preprocess'
preprocess_data(input_path, output_path)

# Stop Spark session
spark.stop()