In [None]:
!pip install datasets

In [None]:
from datasets import load_dataset
import pandas as pd

# Παίρνουμε το full dataset σε streaming mode
dataset = load_dataset("bookcorpus", split="train", streaming=True)

In [None]:
csv_path = "bookcorpus.csv"

# Δημιουργία αρχείου με header
with open(csv_path, 'w', encoding='utf-8') as f:
    f.write("text\n")

# Batch export
batch_size = 200_000
batch = []
total = 0

for i, example in enumerate(dataset):
    batch.append(example)
    if (i + 1) % batch_size == 0:
        df = pd.DataFrame(batch)
        df.to_csv(csv_path, mode='a', header=False, index=False)
        total += len(batch)
        print(f"✅ Saved {total:,} rows...")
        batch = []

# Αποθήκευση τελευταίου batch
if batch:
    df = pd.DataFrame(batch)
    df.to_csv(csv_path, mode='a', header=False, index=False)
    total += len(batch)
    print(f"✅ Final batch saved (total: {total:,})")


In [None]:
!apt-get install openjdk-11-jdk -y
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar -xvzf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("BookCorpusAnalysis") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

In [None]:
from pyspark.sql.functions import col, split, size

# Στήλη με αριθμό λέξεων
df_words = df.withColumn("word_count", size(split(col("text"), "\\s+")))

# Στατιστικά word counts
df_words.select("word_count").summary("count", "mean", "min", "25%", "50%", "75%", "max").show()



In [None]:
# Ngrok για Spark UI
!wget -q -nc https://bin.equinox.io/c/bNyj1mQVY4c/ngrok-stable-linux-amd64.zip
!unzip -n ngrok-stable-linux-amd64.zip
# Ορισμός του Ngrok token
!./ngrok authtoken 2uzOrSfKyZmx3KsoUMO8UpSYJ5x_3yfwivzhuBKEVTJJjJncJ
# Ξεκινάμε το ngrok tunnel προς την πόρτα του Spark UI
get_ipython().system_raw('./ngrok http 4040 --log=stdout > ngrok.log &')

In [None]:
import time, requests
time.sleep(3)

try:
    r = requests.get('http://localhost:4040/api/tunnels')
    ui_url = r.json()['tunnels'][0]['public_url']
    print(f"🚀 Spark UI is live at: {ui_url}")
except:
    print("❌ Ngrok δεν συνδέθηκε")
    !tail -n 20 ngrok.log



In [None]:
from pyspark.sql.functions import explode, lower, regexp_replace, split, col, length

# Καθαρισμός κειμένου + Tokenization + Explode
df_tokens = df.select("text").na.drop().repartition(8).select(
    explode(
        split(
            lower(
                regexp_replace(col("text"), r"[^a-zA-Z]", " ")
            ),
            r"\s+"
        )
    ).alias("word")
).filter(length(col("word")) > 2)  # αγνόηση μικρών λέξεων

In [None]:
# Ομαδοποίηση και μέτρηση συχνοτήτων
word_freq = df_tokens.groupBy("word").count().orderBy(col("count").desc())

# Προβολή top-20 (Spark UI δείχνει το stage)
word_freq.show(20, truncate=False)

In [None]:
import random
import pandas as pd

# Υποθέτουμε ότι το dataset έρχεται ως iterator (stream)
def reservoir_sample(stream, k=100000):
    reservoir = []
    for i, item in enumerate(stream):
        if i < k:
            reservoir.append(item)
        else:
            j = random.randint(0, i)
            if j < k:
                reservoir[j] = item
    return reservoir

# Εφαρμογή σε bookcorpus.csv
with open("bookcorpus.csv", "r", encoding="utf-8") as f:
    next(f)  # skip header
    stream = ({"text": line.strip()} for line in f)
    sampled = reservoir_sample(stream, k=100000)

# Αποθήκευση δείγματος
df_reservoir = pd.DataFrame(sampled)
df_reservoir.to_csv("reservoir_sample.csv", index=False)

In [None]:
df_reservoir = spark.read.csv("reservoir_sample.csv", header=True)


In [None]:
from pyspark.sql.functions import explode, lower, regexp_replace, split, col, length

df_tokens_rsv = df_reservoir.select(
    explode(
        split(
            lower(regexp_replace(col("text"), r"[^a-zA-Z]", " ")),
            "\\s+"
        )
    ).alias("word")
).filter(length(col("word")) > 2)

word_freq_rsv = df_tokens_rsv.groupBy("word").count().orderBy(col("count").desc())

# Πάρε τα top-20 σε pandas
top_rsv = word_freq_rsv.limit(20).toPandas()
top_rsv

In [None]:
!pip install mmh3


In [None]:
import mmh3
import numpy as np
from collections import Counter

class CountMinSketch:
    def __init__(self, width=1000, depth=5, seed=42):
        self.width = width
        self.depth = depth
        self.table = np.zeros((depth, width), dtype=int)
        self.seeds = [seed + i for i in range(depth)]

    def add(self, key):
        for i, s in enumerate(self.seeds):
            h = mmh3.hash(key, s) % self.width
            self.table[i][h] += 1

    def estimate(self, key):
        return min(
            self.table[i][mmh3.hash(key, s) % self.width]
            for i, s in enumerate(self.seeds)
        )

In [None]:
# Λίστα λέξεων από το sample
from collections import Counter
words = df_reservoir.toPandas()["text"].str.lower().str.replace(r"[^a-z ]", "", regex=True).str.split()
flat_words = [w for sublist in words for w in sublist if len(w) > 2]

true_counts = Counter(flat_words)

# Εισαγωγή λέξεων στο Sketch
cms = CountMinSketch(width=1000, depth=5)
for word in flat_words:
    cms.add(word)

# Συγκριτικός πίνακας
results = []
for word, true_val in true_counts.most_common(20):
    est_val = cms.estimate(word)
    rel_err = 100 * abs(est_val - true_val) / true_val
    results.append((word, true_val, est_val, rel_err))

import pandas as pd
df_sketch = pd.DataFrame(results, columns=["word", "true", "estimated", "error_%"])
df_sketch

In [None]:
import hashlib
import numpy as np

class FMSketch:
    def __init__(self, num_hashes=64):
        self.num_hashes = num_hashes
        self.max_zeroes = np.zeros(num_hashes, dtype=int)

    def _hash(self, x, seed):
        x = f"{x}_{seed}".encode('utf-8')
        h = hashlib.sha1(x).hexdigest()
        b = bin(int(h, 16))[2:].zfill(160)  # 160-bit string
        return b

    def add(self, x):
        for i in range(self.num_hashes):
            b = self._hash(x, i)
            self.max_zeroes[i] = max(self.max_zeroes[i], self._rho(b))

    def _rho(self, b):
        return b.find('1') + 1  # position of first 1

    def estimate(self):
        return 2 ** (np.median(self.max_zeroes)) / 0.77351

In [None]:
# Flat list of all words από το reservoir
words = df_reservoir.toPandas()["text"].str.lower().str.replace(r"[^a-z ]", "", regex=True).str.split()
flat_words = [w for sublist in words for w in sublist if len(w) > 2]

# Υπολογισμός ground truth
true_unique = len(set(flat_words))

# Υπολογισμός με FM
fm = FMSketch(num_hashes=64)
for w in flat_words:
    fm.add(w)

estimated_unique = int(fm.estimate())
error_percent = abs(estimated_unique - true_unique) / true_unique * 100

print(f"✅ True unique words: {true_unique}")
print(f"✅ Estimated unique words (FM): {estimated_unique}")
print(f"🧮 Relative Error: {error_percent:.2f}%")

In [None]:
import matplotlib.pyplot as plt # Fixed import
word_lengths = df_reservoir.toPandas()["text"].str.split().map(len)
word_lengths.hist(bins=30, figsize=(8, 4))
plt.title("Histogram of Word Counts per Record") # Now plt is pyplot
plt.xlabel("Word Count")
plt.ylabel("Frequency")
plt.savefig("figures/histogram_wordcount.png")

In [None]:
!pip install PyWavelets # Install using pip
import pywt # Then import
import numpy as np

counts = np.array(word_lengths)
coeffs = pywt.wavedec(counts, 'db1', level=3)
approx, *details = coeffs
!pip install PyWavelets # Install using pip
import pywt # Then import
import numpy as np

counts = np.array(word_lengths)
coeffs = pywt.wavedec(counts, 'db1', level=3)
approx, *details = coeffs
reconstructed = pywt.waverec(coeffs, 'db1')

compression_ratio = len(approx) / len(counts)
ratio = len(approx) / len(counts)

In [None]:
import pywt
import numpy as np
import matplotlib.pyplot as plt

# Λίστα με word counts ανά record
word_lengths = df_reservoir.toPandas()["text"].str.split().map(len).to_numpy()

# Wavelet decomposition (3 levels)
coeffs = pywt.wavedec(word_lengths, wavelet='db1', level=3)
approx, *details = coeffs

# Reconstruction
reconstructed = pywt.waverec(coeffs, wavelet='db1')

# Resize to original length
reconstructed = reconstructed[:len(word_lengths)]

# Compression ratio
compression_ratio = len(approx) / len(word_lengths)

# Plot original vs reconstructed
plt.figure(figsize=(10, 4))
plt.plot(word_lengths[:500], label="Original")
plt.plot(reconstructed[:500], label="Reconstructed", linestyle='--')
plt.legend()
plt.title("Original vs Wavelet-Reconstructed Word Counts")
plt.tight_layout()
plt.savefig("figures/wavelet_reconstruction.png")