# Types of Data

### 1. **Structured Data**
   - **Relational Database Data**: 
     - **Example**: A SQL database containing tables like `Customers`, `Orders`, and `Products`.
     - **Columns**: `CustomerID`, `OrderID`, `ProductID`, `OrderDate`, `Quantity`, `Price`.
   - **CSV Files**:
     - **Example**: A CSV file with employee records.
     - **Columns**: `EmployeeID`, `Name`, `Department`, `Salary`, `HireDate`.

### 2. **Semi-Structured Data**
   - **JSON Data**:
     - **Example**: A JSON file containing user profiles.
     - **Structure**:
       ```json
       {
         "user_id": "123",
         "name": "John Doe",
         "preferences": {
           "color": "blue",
           "food": "pizza"
         },
         "purchase_history": [
           {"product_id": "001", "date": "2023-01-01", "amount": 25.5},
           {"product_id": "002", "date": "2023-02-14", "amount": 15.0}
         ]
       }
       ```
   - **XML Data**:
     - **Example**: An XML file containing book information.
     - **Structure**:
       ```xml
       <book>
         <title>Data Engineering 101</title>
         <author>Jane Smith</author>
         <published>2021-05-10</published>
         <price>39.99</price>
       </book>
       ```

### 3. **Unstructured Data**
   - **Text Files**:
     - **Example**: A collection of text files with raw customer feedback.
     - **Content**: 
       ```
       "The product is great but the delivery was slow."
       "I love the new features in the latest update!"
       ```
   - **Log Files**:
     - **Example**: Server logs capturing user activities.
     - **Content**:
       ```
       2023-08-13 10:22:34, UserID: 123, Action: Login
       2023-08-13 10:23:01, UserID: 123, Action: Viewed Product, ProductID: 456
       ```

### 4. **Time-Series Data**
   - **Sensor Data**:
     - **Example**: Data from IoT sensors monitoring temperature and humidity.
     - **Columns**: `Timestamp`, `SensorID`, `Temperature`, `Humidity`.
     - **Sample**:
       ```
       2023-08-13 10:00:00, Sensor_01, 25.4, 60
       2023-08-13 10:05:00, Sensor_01, 25.7, 58
       ```
   - **Financial Data**:
     - **Example**: Stock price data.
     - **Columns**: `Timestamp`, `StockSymbol`, `OpenPrice`, `ClosePrice`, `Volume`.
     - **Sample**:
       ```
       2023-08-13 09:30:00, AAPL, 145.6, 147.3, 50000
       2023-08-13 09:45:00, AAPL, 147.3, 148.0, 62000
       ```

### 5. **Graph Data**
   - **Social Network Data**:
     - **Example**: A graph dataset representing a social network.
     - **Nodes**: Users.
     - **Edges**: Friend relationships.
     - **Structure**:
       ```json
       {
         "nodes": [
           {"user_id": "123", "name": "Alice"},
           {"user_id": "124", "name": "Bob"}
         ],
         "edges": [
           {"from": "123", "to": "124", "type": "friend"}
         ]
       }
       ```

### 6. **Geospatial Data**
   - **Location Data**:
     - **Example**: GPS coordinates of delivery trucks.
     - **Columns**: `TruckID`, `Latitude`, `Longitude`, `Timestamp`.
     - **Sample**:
       ```
       TRK_001, 40.7128, -74.0060, 2023-08-13 08:00:00
       TRK_002, 34.0522, -118.2437, 2023-08-13 08:05:00
       ```

### 7. **Image Data**
   - **Image Files**:
     - **Example**: A dataset of labeled images for computer vision tasks.
     - **Structure**: 
       - `Image`: A file representing the image.
       - `Label`: A category label for the image (e.g., "cat", "dog").
     - **File**: `image_001.jpg` with label `cat`.

### 8. **Audio Data**
   - **Audio Files**:
     - **Example**: A collection of audio recordings for speech recognition.
     - **Files**: `audio_001.wav`, `audio_002.wav`.
     - **Metadata**: 
       - `Duration`: Length of the audio.
       - `Transcript`: Text transcription of the speech.

### 9. **Video Data**
   - **Video Files**:
     - **Example**: Security camera footage.
     - **Files**: `video_001.mp4`.
     - **Metadata**: 
       - `Duration`: Length of the video.
       - `Timestamp`: When the video was recorded.

### 10. **Streaming Data**
   - **Real-Time Data Streams**:
     - **Example**: Real-time tweets from Twitter API.
     - **Structure**: JSON objects with fields like `tweet_id`, `user_id`, `timestamp`, `text`.

These examples can help students understand how to handle, process, and analyze different types of data in various data engineering contexts.

# -------------------------------------------------------------------------

# 1. Structured Data

In [19]:
import pandas as pd

df = pd.read_csv('DataSets/Housing.csv')

df.head(10)

Unnamed: 0,rownames,price,lotsize,bedrooms,bathrms,stories,driveway,recroom,fullbase,gashw,airco,garagepl,prefarea
0,1,42000,5850,3,1,2,yes,no,yes,no,no,1,no
1,2,38500,4000,2,1,1,yes,no,no,no,no,0,no
2,3,49500,3060,3,1,1,yes,no,no,no,no,0,no
3,4,60500,6650,3,1,2,yes,yes,no,no,no,0,no
4,5,61000,6360,2,1,1,yes,no,no,no,no,0,no
5,6,66000,4160,3,1,1,yes,yes,yes,no,yes,0,no
6,7,66000,3880,3,2,2,yes,no,yes,no,no,2,no
7,8,69000,4160,3,1,3,yes,no,no,no,no,0,no
8,9,83800,4800,3,1,1,yes,yes,yes,no,no,0,no
9,10,88500,5500,3,2,4,yes,yes,no,no,yes,1,no


In [20]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 546 entries, 0 to 545
Data columns (total 13 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   rownames  546 non-null    int64 
 1   price     546 non-null    int64 
 2   lotsize   546 non-null    int64 
 3   bedrooms  546 non-null    int64 
 4   bathrms   546 non-null    int64 
 5   stories   546 non-null    int64 
 6   driveway  546 non-null    object
 7   recroom   546 non-null    object
 8   fullbase  546 non-null    object
 9   gashw     546 non-null    object
 10  airco     546 non-null    object
 11  garagepl  546 non-null    int64 
 12  prefarea  546 non-null    object
dtypes: int64(7), object(6)
memory usage: 55.6+ KB


In [24]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .getOrCreate()

# Verify the Spark session is created
print("Spark Session Created")

# Read the CSV file into a DataFrame
spdf = spark.read.csv('DataSets/Housing.csv', header=True, inferSchema=True)

# Show the DataFrame
spdf.show()

# Stop the Spark session
spark.stop()

24/08/14 09:32:14 INFO SparkContext: Running Spark version 3.5.2
24/08/14 09:32:14 INFO SparkContext: OS info Mac OS X, 14.5, aarch64
24/08/14 09:32:14 INFO SparkContext: Java version 22.0.1
24/08/14 09:32:14 INFO ResourceUtils: No custom resources configured for spark.driver.
24/08/14 09:32:14 INFO SparkContext: Submitted application: MySparkApp
24/08/14 09:32:14 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/08/14 09:32:14 INFO ResourceProfile: Limiting resource is cpu
24/08/14 09:32:14 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/08/14 09:32:14 INFO SecurityManager: Changing view acls to: paritoshsharma
24/08/14 09:32:14 INFO SecurityManager: Changing modify acls to: paritoshsharma
24/08/14 09:32:14 INFO SecurityMan

Spark Session Created


24/08/14 09:32:14 INFO SparkContext: Starting job: csv at DirectMethodHandleAccessor.java:103
24/08/14 09:32:14 INFO DAGScheduler: Got job 0 (csv at DirectMethodHandleAccessor.java:103) with 1 output partitions
24/08/14 09:32:14 INFO DAGScheduler: Final stage: ResultStage 0 (csv at DirectMethodHandleAccessor.java:103)
24/08/14 09:32:14 INFO DAGScheduler: Parents of final stage: List()
24/08/14 09:32:14 INFO DAGScheduler: Missing parents: List()
24/08/14 09:32:14 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at csv at DirectMethodHandleAccessor.java:103), which has no missing parents
24/08/14 09:32:14 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 13.5 KiB, free 434.2 MiB)
24/08/14 09:32:14 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.4 KiB, free 434.2 MiB)
24/08/14 09:32:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.2:59849 (size: 6.4 KiB, free: 434.4 MiB)
24/08/14 0

+--------+-----+-------+--------+-------+-------+--------+-------+--------+-----+-----+--------+--------+
|rownames|price|lotsize|bedrooms|bathrms|stories|driveway|recroom|fullbase|gashw|airco|garagepl|prefarea|
+--------+-----+-------+--------+-------+-------+--------+-------+--------+-----+-----+--------+--------+
|       1|42000|   5850|       3|      1|      2|     yes|     no|     yes|   no|   no|       1|      no|
|       2|38500|   4000|       2|      1|      1|     yes|     no|      no|   no|   no|       0|      no|
|       3|49500|   3060|       3|      1|      1|     yes|     no|      no|   no|   no|       0|      no|
|       4|60500|   6650|       3|      1|      2|     yes|    yes|      no|   no|   no|       0|      no|
|       5|61000|   6360|       2|      1|      1|     yes|     no|      no|   no|   no|       0|      no|
|       6|66000|   4160|       3|      1|      1|     yes|    yes|     yes|   no|  yes|       0|      no|
|       7|66000|   3880|       3|      2|     

24/08/14 09:32:14 INFO CodeGenerator: Code generated in 6.523042 ms
24/08/14 09:32:14 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 2512 bytes result sent to driver
24/08/14 09:32:14 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 29 ms on 192.168.1.2 (executor driver) (1/1)
24/08/14 09:32:14 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
24/08/14 09:32:14 INFO DAGScheduler: ResultStage 2 (showString at DirectMethodHandleAccessor.java:103) finished in 0.033 s
24/08/14 09:32:14 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
24/08/14 09:32:14 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
24/08/14 09:32:14 INFO DAGScheduler: Job 2 finished: showString at DirectMethodHandleAccessor.java:103, took 0.033893 s
24/08/14 09:32:14 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/08/14 09:32:14 INFO SparkUI: Stopped Spark web UI at htt

In [107]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("SparkSQLExample") \
    .getOrCreate()

# Create a DataFrame
data = [("John", 28), ("Anna", 24), ("Mike", 32)]
columns = ["Name", "Age"]

df = spark.createDataFrame(data, columns)

# Create a temporary view
df.createOrReplaceTempView("people")

# Run SQL query
result = spark.sql("SELECT * FROM people WHERE Age > 25")

# Show the result
result.show()

# Stop the SparkSession
# spark.stop()


24/08/16 09:53:23 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/08/16 09:53:23 INFO CodeGenerator: Code generated in 25.061334 ms
24/08/16 09:53:23 INFO SparkContext: Starting job: showString at DirectMethodHandleAccessor.java:103
24/08/16 09:53:23 INFO DAGScheduler: Got job 4 (showString at DirectMethodHandleAccessor.java:103) with 1 output partitions
24/08/16 09:53:23 INFO DAGScheduler: Final stage: ResultStage 4 (showString at DirectMethodHandleAccessor.java:103)
24/08/16 09:53:23 INFO DAGScheduler: Parents of final stage: List()
24/08/16 09:53:23 INFO DAGScheduler: Missing parents: List()
24/08/16 09:53:23 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[18] at showString at DirectMethodHandleAccessor.java:103), which has no missing parents
24/08/16 09:53:23 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 13.1 KiB, free 434.1 MiB)
24/08/16 09:53:24 INFO MemoryStore: Block broadcas

+----+---+
|Name|Age|
+----+---+
|John| 28|
|Mike| 32|
+----+---+



24/08/16 09:53:24 INFO CodeGenerator: Code generated in 13.195667 ms
24/08/16 09:53:24 INFO PythonRunner: Times: total = 55, boot = 44, init = 11, finish = 0
24/08/16 09:53:24 INFO Executor: Finished task 0.0 in stage 4.0 (TID 5). 1942 bytes result sent to driver
24/08/16 09:53:24 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 5) in 197 ms on 192.168.1.2 (executor driver) (1/1)
24/08/16 09:53:24 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
24/08/16 09:53:24 INFO DAGScheduler: ResultStage 4 (showString at DirectMethodHandleAccessor.java:103) finished in 0.277 s
24/08/16 09:53:24 INFO DAGScheduler: Job 4 is finished. Cancelling potential speculative or zombie tasks for this job
24/08/16 09:53:24 INFO TaskSchedulerImpl: Killing all running tasks in stage 4: Stage finished
24/08/16 09:53:24 INFO DAGScheduler: Job 4 finished: showString at DirectMethodHandleAccessor.java:103, took 0.284259 s
24/08/16 09:53:24 INFO SparkContext: Starting j

In [None]:
from pyspark.sql import SparkSession

# Specify the path to the MySQL JDBC driver
jdbc_driver_path = "/path/to/mysql-connector-java-8.0.32.jar"

# Initialize a Spark session with the JDBC driver
spark = SparkSession.builder \
    .appName("MySQL-PySpark-Connection") \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

# MySQL connection properties
jdbc_url = "jdbc:mysql://localhost:3306/mydatabase"
connection_properties = {
    "user": "root",
    "password": "your_password",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Fetch data from a MySQL table
df = spark.read.jdbc(url=jdbc_url, table="my_table", properties=connection_properties)

# Show the data
df.show()

# Stop the Spark session
spark.stop()


In [123]:
import pandas as pd
import glob

# Path pattern to match CSV files
path_pattern = '/Users/paritoshsharma/Desktop/Machine_Learning_Training/Ram/Data/*.csv'

# Use glob to get the list of CSV files
all_files = glob.glob(path_pattern)

# Read and concatenate all CSV files
tf = pd.concat((pd.read_csv(f, skiprows = 10) for f in all_files), ignore_index=True)

# Display the combined DataFrame
print(tf.head())


# tf = pd.read_csv('/Users/paritoshsharma/Desktop/Machine_Learning_Training/Ram/Data/input_Input_Data_13.csv', skiprows=10)

   area   employee id  in time(days)  out time  total income
0      3           16             24        68          4284
1     44           16             29        10          3412
2     32           16             33        98          3105
3     70           16             36        59          3454
4     49           16             48         9          1691


In [124]:
sptf = spark.createDataFrame(tf)

In [125]:
sptf.show()

+-----+-----------+-------------+--------+------------+
|area |employee id|in time(days)|out time|total income|
+-----+-----------+-------------+--------+------------+
|    3|         16|           24|      68|        4284|
|   44|         16|           29|      10|        3412|
|   32|         16|           33|      98|        3105|
|   70|         16|           36|      59|        3454|
|   49|         16|           48|       9|        1691|
|   74|         16|           40|      89|        3912|
|   87|         16|           43|      78|        3183|
|   69|         16|           23|      28|        4651|
|   79|         16|           27|      28|        1666|
|   12|         16|            5|       3|        4965|
|   63|         16|           24|      78|        1562|
|   30|         16|           39|      84|        1622|
|   96|         16|           12|      62|        2818|
|   39|         16|           33|      56|        1549|
|   41|         16|           28|       8|      

24/08/16 10:07:04 INFO CodeGenerator: Code generated in 12.777042 ms
24/08/16 10:07:04 INFO SparkContext: Starting job: showString at DirectMethodHandleAccessor.java:103
24/08/16 10:07:04 INFO DAGScheduler: Got job 13 (showString at DirectMethodHandleAccessor.java:103) with 1 output partitions
24/08/16 10:07:04 INFO DAGScheduler: Final stage: ResultStage 15 (showString at DirectMethodHandleAccessor.java:103)
24/08/16 10:07:04 INFO DAGScheduler: Parents of final stage: List()
24/08/16 10:07:04 INFO DAGScheduler: Missing parents: List()
24/08/16 10:07:04 INFO DAGScheduler: Submitting ResultStage 15 (MapPartitionsRDD[44] at showString at DirectMethodHandleAccessor.java:103), which has no missing parents
24/08/16 10:07:04 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 13.8 KiB, free 433.9 MiB)
24/08/16 10:07:04 INFO MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 6.8 KiB, free 433.9 MiB)
24/08/16 10:07:04 INFO BlockManagerIn

In [126]:
sptf2 = sptf.withColumn("Average", sptf['area ']*sptf['total income']) \
                  .withColumn("OUTAverage", sptf['area ']*sptf['out time']) \
                  .withColumn("INAverage", sptf['area ']*sptf['in time(days)'])

In [127]:
sptf2.show()

+-----+-----------+-------------+--------+------------+-------+----------+---------+
|area |employee id|in time(days)|out time|total income|Average|OUTAverage|INAverage|
+-----+-----------+-------------+--------+------------+-------+----------+---------+
|    3|         16|           24|      68|        4284|  12852|       204|       72|
|   44|         16|           29|      10|        3412| 150128|       440|     1276|
|   32|         16|           33|      98|        3105|  99360|      3136|     1056|
|   70|         16|           36|      59|        3454| 241780|      4130|     2520|
|   49|         16|           48|       9|        1691|  82859|       441|     2352|
|   74|         16|           40|      89|        3912| 289488|      6586|     2960|
|   87|         16|           43|      78|        3183| 276921|      6786|     3741|
|   69|         16|           23|      28|        4651| 320919|      1932|     1587|
|   79|         16|           27|      28|        1666| 131614|  

24/08/16 10:07:14 INFO CodeGenerator: Code generated in 11.764625 ms
24/08/16 10:07:14 INFO SparkContext: Starting job: showString at DirectMethodHandleAccessor.java:103
24/08/16 10:07:14 INFO DAGScheduler: Got job 14 (showString at DirectMethodHandleAccessor.java:103) with 1 output partitions
24/08/16 10:07:14 INFO DAGScheduler: Final stage: ResultStage 16 (showString at DirectMethodHandleAccessor.java:103)
24/08/16 10:07:14 INFO DAGScheduler: Parents of final stage: List()
24/08/16 10:07:14 INFO DAGScheduler: Missing parents: List()
24/08/16 10:07:14 INFO DAGScheduler: Submitting ResultStage 16 (MapPartitionsRDD[46] at showString at DirectMethodHandleAccessor.java:103), which has no missing parents
24/08/16 10:07:14 INFO MemoryStore: Block broadcast_15 stored as values in memory (estimated size 15.2 KiB, free 433.9 MiB)
24/08/16 10:07:14 INFO MemoryStore: Block broadcast_15_piece0 stored as bytes in memory (estimated size 7.2 KiB, free 433.9 MiB)
24/08/16 10:07:14 INFO BlockManagerIn

In [119]:
sptf2.groupBy('employee id').sum('Average').show(truncate=False) #/ sptf.groupBy('employee id').sum('area ')

+-----------+------------+
|employee id|sum(Average)|
+-----------+------------+
|13         |239901859   |
+-----------+------------+



24/08/16 10:00:03 INFO DAGScheduler: Registering RDD 34 (showString at DirectMethodHandleAccessor.java:103) as input to shuffle 1
24/08/16 10:00:03 INFO DAGScheduler: Got map stage job 11 (showString at DirectMethodHandleAccessor.java:103) with 8 output partitions
24/08/16 10:00:03 INFO DAGScheduler: Final stage: ShuffleMapStage 12 (showString at DirectMethodHandleAccessor.java:103)
24/08/16 10:00:03 INFO DAGScheduler: Parents of final stage: List()
24/08/16 10:00:03 INFO DAGScheduler: Missing parents: List()
24/08/16 10:00:03 INFO DAGScheduler: Submitting ShuffleMapStage 12 (MapPartitionsRDD[34] at showString at DirectMethodHandleAccessor.java:103), which has no missing parents
24/08/16 10:00:03 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 40.1 KiB, free 434.0 MiB)
24/08/16 10:00:03 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 18.7 KiB, free 434.0 MiB)
24/08/16 10:00:03 INFO BlockManagerInfo: Added broadcast_1

In [105]:
tf['Average'] = tf['area ']*tf['total income']
tf['OUTAverage'] = tf['area ']*tf['out time']
tf['INAverage'] = tf['area ']*tf['in time(days)']

In [106]:
tf.head()

Unnamed: 0,area,employee id,in time(days),out time,total income,Average,OUTAverage,INAverage
0,10,13,50,69,1494,14940,690,500
1,200,13,36,46,1296,259200,9200,7200
2,200,13,15,46,1195,239000,9200,3000
3,200,13,11,46,1192,238400,9200,2200
4,30,13,18,47,1267,38010,1410,540


In [102]:
ave = tf.groupby('employee id').sum('Average').reset_index()['Average'] / tf.groupby('employee id').sum('area ').reset_index()['area ']
ave = tf.groupby('employee id').sum('OUTAverage').reset_index()['OUTAverage'] / tf.groupby('employee id').sum('area ').reset_index()['area ']
ave = tf.groupby('employee id').sum('INAverage').reset_index()['INAverage'] / tf.groupby('employee id').sum('area ').reset_index()['area ']

In [101]:
tf.groupby('employee id').sum('Average')

Unnamed: 0_level_0,area,in time(days),out time,total income,Average
employee id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
13,102411,104969,179150,8382457,239901859


In [99]:
tf.groupby('employee id').sum('Average').reset_index()

Unnamed: 0,employee id,area,in time(days),out time,total income,Average
0,13,102411,104969,179150,8382457,239901859


In [104]:
min(ave)

2342.5399517629944

In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Read CSV with Header Starting from 11th Row") \
    .getOrCreate()

# Path to the CSV file
csv_file_path = "/Users/paritoshsharma/Desktop/Machine_Learning_Training/Ram/Data/input_Input_Data_16.csv"

# Read the entire CSV file as a text file
rdd = spark.sparkContext.textFile(csv_file_path)

# Filter out the first 10 rows (assuming they are null or irrelevant)
filtered_rdd = rdd.zipWithIndex().filter(lambda x: x[1] >= 10).keys()

# Convert the RDD back to a DataFrame, splitting by the delimiter (e.g., comma)
df = filtered_rdd.map(lambda line: line.split(",")).toDF()

# Assign the 11th row as the header (if not manually specifying the schema)
header = df.first()

# Create a DataFrame with the header
df_with_header = df.toDF(*header)

# Filter out the header row from the DataFrame
df_with_header = df_with_header.filter(df_with_header[header[0]] != header[0])

# Show the DataFrame
df_with_header.show()



24/08/15 09:59:34 INFO SparkContext: Running Spark version 3.5.2
24/08/15 09:59:34 INFO SparkContext: OS info Mac OS X, 14.5, aarch64
24/08/15 09:59:34 INFO SparkContext: Java version 22.0.1
24/08/15 09:59:34 INFO ResourceUtils: No custom resources configured for spark.driver.
24/08/15 09:59:34 INFO SparkContext: Submitted application: Read CSV with Header Starting from 11th Row
24/08/15 09:59:34 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/08/15 09:59:34 INFO ResourceProfile: Limiting resource is cpu
24/08/15 09:59:34 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/08/15 09:59:34 INFO SecurityManager: Changing view acls to: paritoshsharma
24/08/15 09:59:34 INFO SecurityManager: Changing modify acls to: paritoshsharma
2

+-----+-----------+-------------+--------+------------+
|area |employee id|in time(days)|out time|total income|
+-----+-----------+-------------+--------+------------+
|    3|         16|           24|      68|        4284|
|   44|         16|           29|      10|        3412|
|   32|         16|           33|      98|        3105|
|   70|         16|           36|      59|        3454|
|   49|         16|           48|       9|        1691|
|   74|         16|           40|      89|        3912|
|   87|         16|           43|      78|        3183|
|   69|         16|           23|      28|        4651|
|   79|         16|           27|      28|        1666|
|   12|         16|            5|       3|        4965|
|   63|         16|           24|      78|        1562|
|   30|         16|           39|      84|        1622|
|   96|         16|           12|      62|        2818|
|   39|         16|           33|      56|        1549|
|   41|         16|           28|       8|      

24/08/15 09:59:34 INFO PythonRunner: Times: total = 440, boot = 437, init = 2, finish = 1
24/08/15 09:59:34 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1410 bytes result sent to driver
24/08/15 09:59:34 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 447 ms on 192.168.1.2 (executor driver) (1/2)
24/08/15 09:59:34 INFO PythonRunner: Times: total = 441, boot = 438, init = 2, finish = 1
24/08/15 09:59:34 INFO PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 53021
24/08/15 09:59:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1410 bytes result sent to driver
24/08/15 09:59:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 449 ms on 192.168.1.2 (executor driver) (2/2)
24/08/15 09:59:34 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
24/08/15 09:59:34 INFO DAGScheduler: ResultStage 0 (zipWithIndex at /var/folders/_y/cjj70hnd0gvd6szbplrbn2f00000gn/T/ipykernel_7476/1412836714

# Semi-Structured Data

## NoSQL or JSON Data

![image.png](attachment:61c98c90-7db3-4d73-b8a5-98d956f5fdd5.png)

In [57]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

uri = "mongodb+srv://p4r1t0sh:I2PHxVGKTDEL3opv@cluster0.okdze.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [64]:
# Select the database (Replace 'mydatabase' with your database name)
db = client['sample_mflix']

# Select the collection
collection = db['movies']

# Convert the list of dictionaries into a DataFrame
df = pd.DataFrame(collection)

# Fetch all documents from the collection
document = collection.find_one()

print(document)

{'_id': ObjectId('573a1390f29313caabcd42e8'), 'plot': 'A group of bandits stage a brazen train hold-up, only to find a determined posse hot on their heels.', 'genres': ['Short', 'Western'], 'runtime': 11, 'cast': ['A.C. Abadie', "Gilbert M. 'Broncho Billy' Anderson", 'George Barnes', 'Justus D. Barnes'], 'poster': 'https://m.media-amazon.com/images/M/MV5BMTU3NjE5NzYtYTYyNS00MDVmLWIwYjgtMmYwYWIxZDYyNzU2XkEyXkFqcGdeQXVyNzQzNzQxNzI@._V1_SY1000_SX677_AL_.jpg', 'title': 'The Great Train Robbery', 'fullplot': "Among the earliest existing films in American cinema - notable as the first film that presented a narrative story to tell - it depicts a group of cowboy outlaws who hold up a train and rob the passengers. They are then pursued by a Sheriff's posse. Several scenes have color included - all hand tinted.", 'languages': ['English'], 'released': datetime.datetime(1903, 12, 1, 0, 0), 'directors': ['Edwin S. Porter'], 'rated': 'TV-G', 'awards': {'wins': 1, 'nominations': 0, 'text': '1 win.'},

In [67]:
# Example: Fetch documents where the 'runtime' field is greater than 90
query = {"runtime": {"$gt": 25}}
d0 = collection.find(query)

for d in d0:
    print(d)

{'_id': ObjectId('573a1390f29313caabcd4eaf'), 'plot': 'A woman, with the aid of her police officer sweetheart, endeavors to uncover the prostitution ring that has kidnapped her sister, and the philanthropist who secretly runs it.', 'genres': ['Crime', 'Drama'], 'runtime': 88, 'cast': ['Jane Gail', 'Ethel Grandin', 'William H. Turner', 'Matt Moore'], 'num_mflix_comments': 1, 'poster': 'https://m.media-amazon.com/images/M/MV5BYzk0YWQzMGYtYTM5MC00NjM2LWE5YzYtMjgyNDVhZDg1N2YzXkEyXkFqcGdeQXVyMzE0MjY5ODA@._V1_SY1000_SX677_AL_.jpg', 'title': 'Traffic in Souls', 'lastupdated': '2015-09-15 02:07:14.247000000', 'languages': ['English'], 'released': datetime.datetime(1913, 11, 24, 0, 0), 'directors': ['George Loane Tucker'], 'rated': 'TV-PG', 'awards': {'wins': 1, 'nominations': 0, 'text': '1 win.'}, 'year': 1913, 'imdb': {'rating': 6.0, 'votes': 371, 'id': 3471}, 'countries': ['USA'], 'type': 'movie', 'tomatoes': {'viewer': {'rating': 3.0, 'numReviews': 85, 'meter': 57}, 'dvd': datetime.datetime

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [80]:
client.close()

In [81]:
# Fetch all documents from the collection
document = collection.find_one()

print(document)

InvalidOperation: Cannot use MongoClient after close