## Step 1: Getting ready of Spark and MongoDB instances

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pymongo import MongoClient

In [10]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DataIngestion") \
    .getOrCreate()

# MongoDB connection string
mongo_uri = "mongodb://localhost:27017/myDatabase"
client = MongoClient(mongo_uri)
db = client['myDatabase']

## Step 2: Read first few lines of each dataset files

In [14]:
import pandas as pd
import json
import xml.etree.ElementTree as ET

import os

# Define the base path for the "demo_data" folder
base_path = os.path.join(os.getcwd(), "demo_data")

# Ensure the "demo_data" directory exists
os.makedirs(base_path, exist_ok=True)

# File paths
csv_file_path = os.path.join(base_path, "datafile.csv")
json_file_path = os.path.join(base_path, "datafile.json")
xml_file_path = os.path.join(base_path, "datafile.xml")
txt_file_path = os.path.join(base_path, "datafile.txt")

Generate demo dataset: 

- csv content
- json content
- xml content
- txt content

In [7]:
csv_content = """title,author,published
Moby Dick,Herman Melville,1851
Pride and Prejudice,Jane Austen,1813
1984,George Orwell,1949
"""

csv_file_path = os.path.join(base_path, "datafile.csv")
with open(csv_file_path, 'w') as file:
    file.write(csv_content)
    
json_content = [
    {"title": "Moby Dick", "author": "Herman Melville", "published": 1851},
    {"title": "Pride and Prejudice", "author": "Jane Austen", "published": 1813},
    {"title": "1984", "author": "George Orwell", "published": 1949}
]

json_file_path = os.path.join(base_path, "datafile.json")
with open(json_file_path, 'w') as file:
    json.dump(json_content, file, indent=4)

xml_content = """<books>
    <book>
        <title>Moby Dick</title>
        <author>Herman Melville</author>
        <published>1851</published>
    </book>
    <book>
        <title>Pride and Prejudice</title>
        <author>Jane Austen</author>
        <published>1813</published>
    </book>
    <book>
        <title>1984</title>
        <author>George Orwell</author>
        <published>1949</published>
    </book>
</books>
"""

xml_file_path = os.path.join(base_path, "datafile.xml")
with open(xml_file_path, 'w') as file:
    file.write(xml_content)

txt_content = """title\tauthor\tpublished
Moby Dick\tHerman Melville\t1851
Pride and Prejudice\tJane Austen\t1813
1984\tGeorge Orwell\t1949
"""

txt_file_path = os.path.join(base_path, "datafile.txt")
with open(txt_file_path, 'w') as file:
    file.write(txt_content)

In [25]:
# Step 2: Read first few lines of each dataset files

# Read a few lines from CSV
df_csv = spark.read.format("csv").option("header", "true").load(csv_file_path)
df_csv.show(5)

+-------------------+---------------+---------+
|              title|         author|published|
+-------------------+---------------+---------+
|          Moby Dick|Herman Melville|     1851|
|Pride and Prejudice|    Jane Austen|     1813|
|               1984|  George Orwell|     1949|
+-------------------+---------------+---------+



In [6]:
import json
from pprint import pprint

print(json_file_path)

# Read the JSON file with a specified encoding (e.g., 'utf-8')
with open(json_file_path, 'r', encoding='utf-8') as file:
    data = json.load(file)

# Print the JSON data
# If it's a list of records, you might want to iterate through it
if isinstance(data, list):
    for record in data:
        pprint(record)
else:  # It's a single JSON object
    pprint(data)

c:\Users\kinla\Documents\All_github_repo\Big-Data-Integration-and-Processing-Course\Part 2 - Individual Skillsets\demo_data\datafile.json
{'metadata': {'creator': 'Lugo, Juan de, cardinale, ; 1583-1660>',
              'description': "[12], 552, [12] p. ; fol,Stemma calcogr. dell'A. "
                             'sul front,Front. stampato in rosso e nero,Testo '
                             'su due col,Fregi xilogr,Segn.: a⁶ A-3A⁶,Ultima '
                             'p. bianca',
              'identifier': 'bub_gb_BmuI0L258yoC',
              'text_link': '...',
              'title': 'R.P. Ioannis de Lugo Hispalensis ... ',
              'year': 1670},
 'text_content': '.1  > \n'
                 '\n'
                 '\n'
                 'Digtized  t)v  Goo  »■ \n'
                 '\n'
                 "' »7 \n"
                 '\n'
                 '\n'
                 '•V  i \n'
                 '\n'
                 '\n'
                 '$ R . P.  I O A N N I S \n'
       

Reading JSON
Related document: https://medium.com/@uzzaman.ahmed/introduction-to-pyspark-json-api-read-and-write-with-parameters-3cca3490e448

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType

# Define the schema based on the JSON structure
metadata_schema = StructType([
    StructField("creator", StringType(), True),
    StructField("description", StringType(), True),
    StructField("identifier", StringType(), True),
    StructField("text_link", StringType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True)
])

schema = StructType([
    StructField("metadata", metadata_schema),
    StructField("text_content", StringType(), True)
])

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read JSON file with PySpark") \
    .getOrCreate()

# Replace this with your actual json file path
json_file_path =  json_file_path
print(json_file_path)

# Read the JSON file using Spark with the defined schema
df = spark.read.schema(schema).json(json_file_path)

# Print the schema to understand the DataFrame structure
df.printSchema()

# Show the DataFrame content
df.show(1)

# Stop the SparkSession
spark.stop()

c:/Users/kinla/Documents/All_github_repo/Big-Data-Integration-and-Processing-Course/Part 2 - Individual Skillsets/demo_data/datafile.json
root
 |-- metadata: struct (nullable = true)
 |    |-- creator: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- identifier: string (nullable = true)
 |    |-- text_link: string (nullable = true)
 |    |-- title: string (nullable = true)
 |    |-- year: integer (nullable = true)
 |-- text_content: string (nullable = true)

+--------+------------+
|metadata|text_content|
+--------+------------+
|NULL    |NULL        |
+--------+------------+
only showing top 1 row



In [33]:

# Read a few lines from XML
df_xml = spark.read.format('com.databricks.spark.xml').options(rowTag='your_row_tag').load(xml_file_path)
df_xml.show(5)

# Read a few lines from TXT tab-limited
df_txt = spark.read.option("delimiter", "\t").csv(txt_file_path)
df_txt.show(5)

Py4JJavaError: An error occurred while calling o247.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: com.databricks.spark.xml. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: com.databricks.spark.xml.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


## Step 3: Data population and Data fusion into 

In [34]:
# Convert Spark DataFrame to Pandas DataFrame and insert into MongoDB
pd_df_csv = df_csv.limit(5).toPandas()
db.csv_collection.insert_many(pd_df_csv.to_dict('records'))

pd_df_json = df_json.limit(5).toPandas()
db.json_collection.insert_many(pd_df_json.to_dict('records'))

pd_df_xml = df_xml.limit(5).toPandas()
db.xml_collection.insert_many(pd_df_xml.to_dict('records'))

pd_df_txt = df_txt.limit(5).toPandas()
db.txt_collection.insert_many(pd_df_txt.to_dict('records'))

print("Data has been populated into MongoDB collections.")

NameError: name 'df_xml' is not defined