# Big Data Project: Transforming Scientific Articles into Videos with Speech using Apache Spark and Kafka


In [None]:
# make modules from py files auto-reload when changed
%load_ext autoreload
%autoreload 2

#TODO:   
create folders for data, checkpoints, output, dbg


# TTS 


## Basic run just for test 

In [None]:

from datetime import datetime
from ArticleReader.Chunker import Chunker
from ArticleReader.LatexToSpeech import LatexParser
from ArticleReader.Narrator import Narrator


In [None]:
input_file = "data/arXiv-2106.04624v1/main.tex"
output_file = "output/" + datetime.now().strftime(r"%y.%m.%d-%H")

parser = LatexParser()
content = parser.read_latex(input_file)
processed = parser.custom_latex_to_text(content)
parser.save_text(processed, "dbg/spec_my.txt")

tables = parser.get_tables()
parser.save_text(tables, "dbg/tables.tex")

In [None]:
chunker = Chunker(max_len=200)
chunker.split_text_into_chunks(processed)
chunks = chunker.get_test_batch(10, 0)
# chunks = chunker.chunks
chunker.save_chunks_as_text(output_file + ".md", chunks)
print("text chunks:", [len(ch) for ch in chunks])

# Spark 

In [None]:
#import findspark
import os
import sys
import pandas as pd 
import torch

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf, col, lit, desc, floor
from pyspark.sql.functions import collect_list, flatten
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark import pandas as psp


In [None]:
sys.executable

In [None]:
os.getcwd()

In [None]:
os.environ['PYTHONPATH'] = '/usr/local/spark/spark/python/lib/py4j-0.10.9.7-src.zip:/usr/local/spark/spark/python/:'

In [None]:
# Set Spark environment variables
print(os.environ['SPARK_HOME'] )
print(os.environ['PYSPARK_PYTHON'])
print(os.environ['PYSPARK_DRIVER_PYTHON'] )
print(os.environ['PYTHONPATH'] )
#print(os.environ['JAVA_HOME'] )

In [None]:
os.environ

In [None]:

# Get number of CPU cores
# workers = 1
# cpus_limit = os.cpu_count() -1 
# mem_limit = "1g" # prod: "16g"

# simulating a cluster with 2 workers
workers = 2
cpus_limit =  int(os.cpu_count()/ workers) -1 
mem_limit = "2g" # prod: "16g"/ workers



# Configure PyTorch for CPU parallelism
torch.set_num_threads(cpus_limit)

# Create Spark session with configurations
spark = SparkSession.builder \
    .appName("TTS CPU Inference") \
    .master("local[*]") \
    .config("spark.executor.cores", cpus_limit) \
    .config("spark.executor.instances", workers) \
    .config("spark.executor.memory", mem_limit) \
    .config("spark.task.cpus", cpus_limit) \
    .config("spark.dynamicAllocation.enabled", "false") \
    .config("spark.sql.shuffle.partitions", "1") \
    .getOrCreate()

    # Additional configs that might be useful in future:
    # .config("spark.executor.resource.gpu.amount", "1") \
    # .config("spark.executor.memoryOverhead", "<memory>"

print(f"Spark version: {spark.version}")
print(f"Using {cpus_limit} CPU cores for inference.")



1. Memory Allocation and Executor Configuration

    Spark executors are allocated a fixed amount of memory when the application starts. This memory is split between:
        JVM heap memory: For Spark tasks and operations.
        Off-heap memory: For operations like shuffle or caching.
        GPU memory (if applicable): Used by PyTorch or other frameworks.
    Memory allocation is configured using:

--executor-memory <memory>
--driver-memory <memory>
--conf spark.memory.fraction=<fraction>

2. Dynamic Resource Allocation

    Spark can adjust resource allocation dynamically if dynamic resource allocation is enabled.
    Configuration:

```
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=<min>
--conf spark.dynamicAllocation.maxExecutors=<max>
```
3. Task Partitioning and Locality

    Spark divides the dataset into partitions, which determine the unit of work assigned to each executor.
    Smaller partitions reduce the likelihood of memory overload but increase overhead. Aim for ~128MB partition sizes for large datasets.
    Control partition size with:
```
--conf spark.sql.files.maxPartitionBytes=<size>
```

## Playground for batching

In [None]:
sentences

In [None]:
waveforms

In [None]:
mel_lens = narrator.add_pauses(sentences.sentence, mel_lengths, pause_dur=40)   

In [None]:
mel_lengths

In [None]:
mel_lens

In [None]:
arr = torch.tensor_split(waveforms.squeeze(1), len(waveforms), dim=0)

#arr = [a[:, :l] for a, l in zip(arr, mel_lengths * self.hop_len)]

# TODO: cut padding 
arr = [a[:, :l].squeeze(0).numpy() for a, l in zip(arr, mel_lens * narrator.hop_len)]


In [None]:
arr[0].shape

In [None]:
type(arr[0])

In [None]:
arr[1].shape

## Pandas UDF approach 


In [None]:

@pandas_udf(StructType([StructField("waveform", ArrayType(FloatType())),StructField("mel_lengths", IntegerType())]))
def predict_batch_udf(sentences: pd.Series) -> pd.DataFrame:
  # TODO: calculate and store "seq_len"
  # TODO: non-default model initialization
  narrator = Narrator()    
  waveforms, mel_lengths = narrator.infer(sentences)

  arr = torch.tensor_split(waveforms.squeeze(1), len(waveforms), dim=0)

  # Add more pause where needed (very naive currenty)
  mel_lengths = narrator.add_pauses(sentences, mel_lengths, pause_dur=40)   
  # Cut silence padding while applying pauses from above 
  arr = [a[:, :l].squeeze(0).numpy() for a, l in zip(arr, mel_lengths * narrator.hop_len)]  
  
  output = pd.DataFrame({"waveform": arr, "mel_lengths": mel_lengths})
 
  return output


In [None]:
# psp.from_pandas(
chunks = spark.createDataFrame(chunker.get_test_batch(15,295))

In [None]:
# Apply the UDF
text_volume_window = (Window.orderBy(desc('text_len'))
             .rowsBetween(Window.unboundedPreceding, 0))
# TODO: maybe can use partitionng here for separating whole text into chapters? 

text_volume_max  = 600 

step1 = chunks.withColumn('cum_text_volume', F.sum('text_len').over(text_volume_window)) \
.withColumn('part', floor(col('cum_text_volume')/lit(text_volume_max)) )
step1

In [None]:
nparts =  step1.select((lit(1) + F.max("part")).alias("npart")).first()
nparts

In [None]:
nparts[0]

In [None]:
step1.show()

In [None]:
step1 = step1.repartitionByRange(nparts[0], "part")

In [None]:
step1.rdd.getNumPartitions()

In [None]:

step2 = step1.withColumn("prediction", predict_batch_udf(col("sentence"))).cache()


In [None]:

step3 = step2.select("index", "sentence", "text_len", "prediction.*")

step4 = step3.sort("index")

print("recombine batch")
wf = step4.agg(flatten(collect_list(col("waveform")))).alias("speech")

#torch.cat(tuple(batch_converted.waveform), dim=1)

final = step4.cache()

In [None]:
step4.select(F.size("waveform")).show()

In [None]:
final.show()

In [None]:
wfc = wf.cache()

In [None]:
wfc.show()

In [None]:
waveform = wfc.collect()[0]

In [None]:
narrator = Narrator()

In [None]:
tens = torch.Tensor(waveform)

In [None]:
tens.shape

In [None]:
case_file = "output/spark_test"


print("saving sound")
narrator.save_audio(case_file + ".wav",tens )
print("done saving sound")

In [None]:
def calculate_batch_size(memory_per_sentence, total_memory, safety_factor=0.8):
    # safety_factor reserves a buffer (e.g., 80% of total memory)
    return int((total_memory * safety_factor) / memory_per_sentence)

batch_size = calculate_batch_size(memory_per_sentence=500_000_000, total_memory=16_000_000_000)
batch_size
# Result: batch_size = 25

## Regular UDF approach

In [None]:

chunks = psp.from_pandas(chunker.get_test_batch(3,53))
#chunks = spark.from_pandas(chunker.chunks)

In [None]:
type(chunks)

In [None]:
chunks.head()

In [None]:

# Estimate batch size
batch_size = calculate_batch_size(memory_per_sentence=500_000_000, total_memory=16_000_000_000)
batch_size


In [None]:
chunks.count()

In [None]:

# Repartition the DataFrame to match the batch size
num_partitions = max(1, chunks.sentence.count() // batch_size)
num_partitions


In [None]:

# from pyspark import SparkContext
# import torch

# sc = SparkContext.getOrCreate()

# # Load your PyTorch model
# model = torch.load('path_to_model.pth', map_location='cpu')
# model.eval()

# Broadcast the model
#broadcast_model = sc.broadcast(model)



In [None]:
# Assign the structure and naming for newly created columns
schema = StructType([StructField("waveforms",
                                 IntegerType(), False),
                     StructField("mel_lengths",
                                 IntegerType(), False)])

In [None]:
@udf('string', PandasUDFType.SCALAR)
def tts_udf(batch):
    narrator = Narrator()    
    waveforms, mel_lengths = narrator.infer(batch)    
    return waveforms, mel_lengths


    # ensure sentences are sorted by seq_len
    batch_df.loc[:,"seq_len"] = batch_df.sentence.map(narrator.seq_len)
    batch_df.sort_values("seq_len", ascending=False, inplace=True)

    # # defining pauses between paragraphs                
    # mel_lengths = narrator.add_pauses(batch_df.sentence, mel_lengths, pause_dur=40)        

    # # turning tensor into regular array
    # arr = torch.tensor_split(waveforms.squeeze(1), len(waveforms), dim=0)

    # # cut padding
    # arr = [a[:, :l] for a, l in zip(arr, mel_lengths * narrator.hop_len)]
    
    # mel_lengths = mel_lengths.detach().numpy()
    # batch_df["waveform"] = arr
    # batch_df["mel_lengths"] = mel_lengths
    # batch_df["durations_sec"] = mel_lengths / 22050.0
    # return batch_df


In [None]:
# Apply the UDF
step1 = chunks.to_spark().sort("text_len", ascending=False ) 
step1.show()

In [None]:
type(step1)

In [None]:

step2 = step1.withColumn("Result", tts_udf("sentence")) 
step2.show()

In [None]:
step3 = step2.select("index", "sentence", "text_len", "Result.*")


In [None]:
@pandas_udf('string', PandasUDFType.SCALAR)
def tts_udf(batch):
    model = broadcast_model.value.to('cuda')
    memory_per_sentence = 500_000_000  # Estimate
    total_memory = torch.cuda.get_device_properties(0).total_memory * 0.8
    batch_size = int(total_memory / memory_per_sentence)

    results = []
    for i in range(0, len(batch), batch_size):
        batch = batch[i:i + batch_size]
        with torch.no_grad():
            inputs = [torch.tensor(sentence).to('cuda') for sentence in batch]
            outputs = model(inputs)
            results.extend(outputs.cpu().numpy())
    return results



In [None]:


# Function to process sentences in a batch
def process_batch(partition):
    model = broadcast_model.value.to('cuda')
    results = []
    for sentence in partition:
        with torch.no_grad():
            output = model(sentence)
            results.append(output.cpu().numpy())
    return iter(results)

# Apply the function to each partition
processed_rdd = df.rdd.mapPartitions(process_batch)
result_df = processed_rdd.toDF(["processed_output"])

Set the executor's memory and GPU resources:

```
--conf spark.executor.memory=16G
--conf spark.executor.resource.gpu.amount=1
```

Adjust the number of partitions:
```
--conf spark.sql.shuffle.partitions=<num_partitions>
```



# End

# Trash 

In [None]:
batch_df = chunker.get_test_batch(3,53)
batch_df


In [None]:
batch_df.loc[:,"seq"] = batch_df.sentence.map(narrator.tts.text_to_seq)

In [None]:
batch_df

In [None]:
batch_df.seq.map(lambda s: len(s[0]))

In [None]:
sentences = batch_df.sort_values("text_len" ,ascending = False).reset_index()
sentences

In [None]:

narrator = Narrator()    
waveforms, mel_lengths = narrator.infer(sentences.sentence)


In [None]:
waveforms.shape

In [None]:
arr = torch.tensor_split(waveforms.squeeze(1), len(waveforms), dim=0)
arr = [a[0].squeeze(0).numpy() for a in arr]
# cut padding
#arr = [a[:, :l] for a, l in zip(arr, mel_lengths * narrator.hop_len)]

#mel_lengths = mel_lengths.detach().numpy()
# batch_df["waveform"] = arr
# batch_df["mel_lengths"] = mel_lengths
# batch_df["durations_sec"] = mel_lengths / 22050.0


In [None]:
arr[0]

In [None]:


output = pd.DataFrame({"waveform": arr, "mel_lengths": mel_lengths})
# sentences["waveforms"] = waveforms
# sentences["mel_lengths"] = mel_lengths    


In [None]:
output

In [None]:
output.shape

In [None]:
output.loc[0,"waveform"]