In [1]:
import os
import io
import json

import minio
from minio import Minio
from pymongo import MongoClient
from pyspark.sql import SparkSession

# Dataset.

Przykładowy rekord.

In [2]:
dataset = open("../../data/arxiv-metadata-oai-snapshot.json")

In [3]:
example = json.loads(next(dataset))
print(example)

{'id': '0704.0001', 'submitter': 'Pavel Nadolsky', 'authors': "C. Bal\\'azs, E. L. Berger, P. M. Nadolsky, C.-P. Yuan", 'title': 'Calculation of prompt diphoton production cross sections at Tevatron and\n  LHC energies', 'comments': '37 pages, 15 figures; published version', 'journal-ref': 'Phys.Rev.D76:013009,2007', 'doi': '10.1103/PhysRevD.76.013009', 'report-no': 'ANL-HEP-PR-07-12', 'categories': 'hep-ph', 'license': None, 'abstract': '  A fully differential calculation in perturbative quantum chromodynamics is\npresented for the production of massive photon pairs at hadron colliders. All\nnext-to-leading order perturbative contributions from quark-antiquark,\ngluon-(anti)quark, and gluon-gluon subprocesses are included, as well as\nall-orders resummation of initial-state gluon radiation valid at\nnext-to-next-to-leading logarithmic accuracy. The region of phase space is\nspecified in which the calculation is most reliable. Good agreement is\ndemonstrated with data from the Fermilab

## MongoDB.

Połącz się z MongoDB.

In [4]:
client = MongoClient("mongodb://root:example@mongo:27017/admin")
db = client["arxiv"]

Sprawdź czy baza danych została utworzona.

In [5]:
client.list_database_names()

['admin', 'arxiv', 'config', 'local']

Utwórz kolekcję.

In [6]:
collection = db["papers"]

Wstaw przykładowy rekord danych.

In [7]:
collection.insert_one(example)

InsertOneResult(ObjectId('6652e92b341cf052182941c8'), acknowledged=True)

Sprawdź czy baza danych została utworzona.

In [8]:
client.list_database_names()

['admin', 'arxiv', 'config', 'local']

Konfiguracja sesji Spark.

In [9]:
spark = (
    SparkSession.builder.appName("read-json-from-mongo")
    .master("spark://spark:7077")
    .config(
        "spark.mongodb.read.connection.uri",
        "mongodb://root:example@mongo:27017/admin",
    )
    .config("spark.mongodb.read.database", "arxiv")
    .config("spark.mongodb.read.collection", "papers")
    .config(
        "spark.mongodb.write.connection.uri",
        "mongodb://root:example@mongo:27017/admin/arxiv.papers",
    )
    .config("spark.mongodb.write.database", "arxiv")
    .config("spark.mongodb.read.collection", "papers")
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector:10.0.1")
    .getOrCreate()
)

24/05/26 09:47:56 WARN Utils: Your hostname, MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.0.193 instead (on interface en0)
24/05/26 09:47:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/lukasz/.ivy2/cache
The jars for the packages stored in: /Users/lukasz/.ivy2/jars
org.mongodb.spark#mongo-spark-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-141bbaa1-d1dd-4313-8e5f-0529d733ffea;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector;10.0.1 in central


:: loading settings :: url = jar:file:/Users/lukasz/Documents/Dokumenty%20%e2%80%94%20MacBook%20Air%20(Lukasz)/edu/pw-big-data-thesis/venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mongodb#mongodb-driver-sync;4.5.1 in central
	[4.5.1] org.mongodb#mongodb-driver-sync;[4.5.0,4.5.99)
	found org.mongodb#bson;4.5.1 in central
	found org.mongodb#mongodb-driver-core;4.5.1 in central
:: resolution report :: resolve 2093ms :: artifacts dl 4ms
	:: modules in use:
	org.mongodb#bson;4.5.1 from central in [default]
	org.mongodb#mongodb-driver-core;4.5.1 from central in [default]
	org.mongodb#mongodb-driver-sync;4.5.1 from central in [default]
	org.mongodb.spark#mongo-spark-connector;10.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   4   |   1   |   0   |   0   ||   4   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spar

In [10]:
spark

In [11]:
df = spark.read.format("mongodb").load()

In [12]:
df

DataFrame[_id: string, abstract: string, authors: string, authors_parsed: array<array<string>>, categories: string, comments: string, doi: string, id: string, journal-ref: string, license: void, report-no: string, submitter: string, title: string, update_date: string, versions: array<struct<version:string,created:string>>]

In [13]:
spark.stop()

# Filter.

In [1]:
import logging

import pandas as pd
import pyspark.sql.functions as F
from dotenv import dotenv_values
from transformers import pipeline

from rag.clients import setup_spark_session


logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

In [6]:
LOGGER.info("Setting up Spark...")
spark = setup_spark_session()
LOGGER.info("Reading data from MongoDB...")
ENV = dict()
source = "s3a://papers/100k"
data = (
    spark.read.format("mongodb")
    .option("database", ENV.get("MONGO_DATABASE", "arxiv"))
    .option("collection", ENV.get("MONGO_COLLECTION", "sentences"))
    .option("pipeline", f"{{ $match: {{ source: '{source}' }} }}")
    .load()
)

INFO:__main__:Setting up Spark...
INFO:__main__:Reading data from MongoDB...


In [7]:
data.show(10)

+---------+--------------------+--------------------+----------------+
|      _id|           full_text|           sentences|          source|
+---------+--------------------+--------------------+----------------+
|0704.0001|  A fully differe...|[A fully differen...|s3a:/papers/100k|
|0704.0002|  We describe a n...|[We describe a ne...|s3a:/papers/100k|
|0704.0003|  The evolution o...|[The evolution of...|s3a:/papers/100k|
|0704.0004|  We show that a ...|[We show that a d...|s3a:/papers/100k|
|0704.0005|  In this paper w...|[In this paper we...|s3a:/papers/100k|
|0704.0006|  We study the tw...|[We study the two...|s3a:/papers/100k|
|0704.0007|  A rather non-st...|[A rather non-sta...|s3a:/papers/100k|
|0704.0008|  A general formu...|[A general formul...|s3a:/papers/100k|
|0704.0009|  We discuss the ...|[We discuss the r...|s3a:/papers/100k|
|0704.0010|  Partial cubes a...|[Partial cubes ar...|s3a:/papers/100k|
+---------+--------------------+--------------------+----------------+
only s

In [8]:
data.count()

                                                                                

200000