In [None]:
#https://spark.apache.org/docs/latest/
#install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=3cabdf3e432558538dabb5fe90d7bd0ad27c78ed124879d042b8f7dafcb008d4
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [None]:
#import SparkSession
from pyspark.sql import SparkSession

The line `spark = SparkSession.builder.getOrCreate()` is used in Apache Spark, a big data processing framework, to create or retrieve a Spark session.

Here's a breakdown of the components:

- **SparkSession**: It is the entry point to programming with Spark. It allows you to create DataFrames, access Spark’s built-in functionalities, and manage Spark configurations.

- **builder**: This is a property of `SparkSession` that provides a way to configure and create a new Spark session.

- **getOrCreate()**: This method either retrieves an existing Spark session if one is already created or creates a new one if none exists.

In summary, this line initializes a Spark session, allowing you to work with Spark functionalities in your application.


In [None]:
#Creatw a SparkSession
spark=SparkSession.builder.getOrCreate()

In [None]:
#list
dataList=[("Java",20000),("Python",100000),("Scala",3000)]

The line `rdd = spark.sparkContext.parallelize(dataList)` is used in Apache Spark to create a Resilient Distributed Dataset (RDD) from a local collection.

Here's a breakdown of the components:

- **spark**: This refers to the previously created Spark session.

- **sparkContext**: This is an object that allows you to interact with Spark on a lower level. It provides functions to create RDDs and to manage the Spark application's context.

- **parallelize(dataList)**: This method takes a local collection (in this case, `dataList`) and distributes it across the cluster, creating an RDD from the data. The RDD can then be processed in parallel.

In summary, this line converts a local list (`dataList`) into an RDD, enabling distributed data processing with Spark.



In [None]:
#Create A Resilient Distributed Dataset (RDD) from a list
rdd=spark.sparkContext.parallelize(dataList)

In [None]:
#print(rdd)
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

In [None]:
#To count Number of elements
rdd.count()

3

In [None]:
#To print Resilient Distributed Dataset (RDD)
rdd.collect()

[('Java', 20000), ('Python', 100000), ('Scala', 3000)]

In [None]:
#To print first element in rdd
rdd.first()

('Java', 20000)

In [None]:
#To print first two elements in rdd
rdd.take(2)

[('Java', 20000), ('Python', 100000)]

In [None]:
#To print the elemets using index
rdd.collect()[-1]

('Scala', 3000)

In [None]:
rdd.collect()[-2]

('Python', 100000)

In [None]:
#To import data sets
from google.colab import files
uploaded = files.upload()

Saving apple.txt to apple.txt


In [None]:
# to read a text file
df = spark.read.text("apple.txt")
df.show()

+--------------------+
|               value|
+--------------------+
|                 "x"|
|"It's amazing.......|
|                   "|
|"Finally a MacBoo...|
|                   "|
|"Best Laptop unde...|
|                   "|
|"Pros:1. Light we...|
|                   "|
|"I have been a Ma...|
|                   "|
|"I am very upset ...|
|                   "|
|"Reason for 1 sta...|
|                   "|
|"Got the Apple Ma...|
|                   "|
|"White line comin...|
|                   "|
|"Bought this prod...|
+--------------------+
only showing top 20 rows



# Data Transformations

Here are the descriptions for `collect`, `map`, `filter`, `reduce`, and `cache` in Apache Spark:

### 1. collect
- **Description**: The `collect()` method retrieves all the elements of an RDD (or DataFrame) and brings them back to the driver program as a list.
- **Use Case**: It is typically used to view the entire dataset when it is small enough to fit into the driver’s memory. However, it should be used cautiously with large datasets to avoid memory overflow.

### 2. map
- **Description**: The `map()` transformation applies a specified function to each element of the RDD, resulting in a new RDD with the transformed elements.
- **Use Case**: It is used to perform operations like data transformation, processing, or modification on each element in the dataset, such as converting data types or extracting specific fields.

### 3. filter
- **Description**: The `filter()` transformation creates a new RDD by selecting elements that satisfy a given predicate (boolean condition).
- **Use Case**: It is useful for filtering out unwanted data based on specific criteria, such as keeping only rows that meet certain conditions in a dataset.

### 4. reduce
- **Description**: The `reduce()` action aggregates the elements of an RDD using a specified binary operation (a function that combines two elements).
- **Use Case**: It is often used for computations like summation, finding the maximum, or any other operation that combines all elements into a single result.

### 5. cache
- **Description**: The `cache()` method stores an RDD in memory across the nodes, allowing for faster access during subsequent computations.
- **Use Case**: It is beneficial when the same RDD is used multiple times in operations, as it avoids the need to recompute the RDD from scratch every time, thereby improving performance.

In summary, these functions and methods are fundamental operations in Spark, used for data processing, transformation, and performance optimization in distributed computing tasks.


In [None]:
# Creating an RDD from a Python list
data = [1, 2, 3, 4, 5]
rdd=spark.sparkContext.parallelize(data)

In [None]:
rdd.collect()

[1, 2, 3, 4, 5]

In [None]:
rdd1=rdd.map(lambda x:x*2)
rdd1.collect()

[2, 4, 6, 8, 10]

In [None]:
# Select even numbers
rdd2 = rdd.filter(lambda x: x % 2 == 0)
rdd2.collect()

[2, 4]

In [None]:
# Sum all the elements
total_sum = rdd.reduce(lambda a, b: a + b)
print(total_sum)

15


In [None]:
# Cache the RDD
rdd.cache()

ParallelCollectionRDD[5] at readRDDFromFile at PythonRDD.scala:289

# Create a Data Frames



In [None]:
#Create a Data Frames
data=[('James','','Smith','1991-04,01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df=spark.createDataFrame(data=data,schema=columns)

In [None]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04,01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [None]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [None]:
columns=["language","users_count"]
data=[("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]

In [None]:
#Create a RDD
rdd=spark.sparkContext.parallelize(data)

In [None]:
#Convert RDD into Dta Frame
dfFromRDD1=rdd.toDF(schema=columns)
dfFromRDD1.show()

+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+



In [None]:
dfFromRDD1.printSchema()

root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)



In [None]:
# Create a DataFrame with StructType,StructField,StringType,IntegerType
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data2=[('James','','Smith','1991-04,01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)]
schema=StructType([\
                         StructField("firstname",StringType(),True), \
                         StructField("middlename",StringType(),True), \
                         StructField("lastname",StringType(),True), \
                         StructField("dob",StringType(),True), \
                         StructField("gender",StringType(),True), \
                         StructField("salary",IntegerType(),True) \
                         ])
df=spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04,01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+



In [None]:
#To convert DataFrame to Pandas.
pandasDF=df.toPandas()

In [None]:
pandasDF

Unnamed: 0,firstname,middlename,lastname,dob,gender,salary
0,James,,Smith,"1991-04,01",M,3000
1,Michael,Rose,,2000-05-19,M,4000
2,Robert,,Williams,1978-09-05,M,4000
3,Maria,Anne,Jones,1967-12-01,F,4000
4,Jen,Mary,Brown,1980-02-17,F,-1


In [None]:
type(pandasDF)

In [None]:
#To fins the Shape of Pandas dataframe
pandasDF.shape

(5, 6)

In [None]:
#to show
pandasDF

Unnamed: 0,firstname,middlename,lastname,dob,gender,salary
0,James,,Smith,"1991-04,01",M,3000
1,Michael,Rose,,2000-05-19,M,4000
2,Robert,,Williams,1978-09-05,M,4000
3,Maria,Anne,Jones,1967-12-01,F,4000
4,Jen,Mary,Brown,1980-02-17,F,-1


In [None]:
#To print first five rows in pandasDF
pandasDF.head()

Unnamed: 0,firstname,middlename,lastname,dob,gender,salary
0,James,,Smith,"1991-04,01",M,3000
1,Michael,Rose,,2000-05-19,M,4000
2,Robert,,Williams,1978-09-05,M,4000
3,Maria,Anne,Jones,1967-12-01,F,4000
4,Jen,Mary,Brown,1980-02-17,F,-1


In [None]:
#To print last five rows in pandasDF
pandasDF.tail()

Unnamed: 0,firstname,middlename,lastname,dob,gender,salary
0,James,,Smith,"1991-04,01",M,3000
1,Michael,Rose,,2000-05-19,M,4000
2,Robert,,Williams,1978-09-05,M,4000
3,Maria,Anne,Jones,1967-12-01,F,4000
4,Jen,Mary,Brown,1980-02-17,F,-1


In [None]:
#To print information About pandasDF
pandasDF.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   firstname   5 non-null      object
 1   middlename  5 non-null      object
 2   lastname    5 non-null      object
 3   dob         5 non-null      object
 4   gender      5 non-null      object
 5   salary      5 non-null      int32 
dtypes: int32(1), object(5)
memory usage: 348.0+ bytes


In [None]:
#To Summarize the data
pandasDF.describe()

Unnamed: 0,salary
count,5.0
mean,2999.8
std,1732.483824
min,-1.0
25%,3000.0
50%,4000.0
75%,4000.0
max,4000.0


In [None]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data2=[('James','','Smith','1991-04,01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)]
schema=StructType([\
                         StructField("firstname",StringType(),True), \
                         StructField("middlename",StringType(),True), \
                         StructField("lastname",StringType(),True), \
                         StructField("dob",StringType(),True), \
                         StructField("gender",StringType(),True), \
                         StructField("salary",IntegerType(),True) \
                         ])
df=spark.createDataFrame(data=data2,schema=schema)

In [None]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04,01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [None]:
df.show(truncate=False)
'''truncate=False: This parameter specifies that the output should not be truncated,
meaning all the data in each column will be fully displayed without cutting off long values. By default,
 if you don’t set this parameter, Spark may limit the display of string data to a predefined length.'''

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|dob       |gender|salary|
+---------+----------+--------+----------+------+------+
|James    |          |Smith   |1991-04,01|M     |3000  |
|Michael  |Rose      |        |2000-05-19|M     |4000  |
|Robert   |          |Williams|1978-09-05|M     |4000  |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |
+---------+----------+--------+----------+------+------+



'truncate=False: This parameter specifies that the output should not be truncated, \nmeaning all the data in each column will be fully displayed without cutting off long values. By default,\n if you don’t set this parameter, Spark may limit the display of string data to a predefined length.'

In [None]:
#truncate=3: This parameter specifies that the output should
#truncate long string values to a maximum of 3 characters
df.show(truncate=3)

+---------+----------+--------+---+------+------+
|firstname|middlename|lastname|dob|gender|salary|
+---------+----------+--------+---+------+------+
|      Jam|          |     Smi|199|     M|   300|
|      Mic|       Ros|        |200|     M|   400|
|      Rob|          |     Wil|197|     M|   400|
|      Mar|       Ann|     Jon|196|     F|   400|
|      Jen|       Mar|     Bro|198|     F|    -1|
+---------+----------+--------+---+------+------+



In [None]:
#To display the First two rows
df.show(2)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04,01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
+---------+----------+--------+----------+------+------+
only showing top 2 rows



In [None]:
df.head(2)

[Row(firstname='James', middlename='', lastname='Smith', dob='1991-04,01', gender='M', salary=3000),
 Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000)]

In [None]:
df.show(2,truncate=3)

+---------+----------+--------+---+------+------+
|firstname|middlename|lastname|dob|gender|salary|
+---------+----------+--------+---+------+------+
|      Jam|          |     Smi|199|     M|   300|
|      Mic|       Ros|        |200|     M|   400|
+---------+----------+--------+---+------+------+
only showing top 2 rows



In [None]:
from pickle import FALSE
#Create a Dataframe
columns=["Seqno","Quote"]
data=[(1,"Be the change you wish to see in the world"),
      (2,"Everyone thinks of changing the world, but no one thinks of changing himself."),
      (3,"The purpose of our lives is to be happy."),
      (4,"Be cool.")]
df=spark.createDataFrame(data=data,schema=columns)
df.show()
df.show(truncate=False)
df.printSchema()

+-----+--------------------+
|Seqno|               Quote|
+-----+--------------------+
|    1|Be the change you...|
|    2|Everyone thinks o...|
|    3|The purpose of ou...|
|    4|            Be cool.|
+-----+--------------------+

+-----+-----------------------------------------------------------------------------+
|Seqno|Quote                                                                        |
+-----+-----------------------------------------------------------------------------+
|1    |Be the change you wish to see in the world                                   |
|2    |Everyone thinks of changing the world, but no one thinks of changing himself.|
|3    |The purpose of our lives is to be happy.                                     |
|4    |Be cool.                                                                     |
+-----+-----------------------------------------------------------------------------+

root
 |-- Seqno: long (nullable = true)
 |-- Quote: string (nullable = true)



In [None]:
df.show(truncate=25)

+-----+-------------------------+
|Seqno|                    Quote|
+-----+-------------------------+
|    1|Be the change you wish...|
|    2|Everyone thinks of cha...|
|    3|The purpose of our liv...|
|    4|                 Be cool.|
+-----+-------------------------+



In [None]:
df.show(5,truncate=False,vertical=True)

-RECORD 0------------------------------------------------------------------------------
 Seqno | 1                                                                             
 Quote | Be the change you wish to see in the world                                    
-RECORD 1------------------------------------------------------------------------------
 Seqno | 2                                                                             
 Quote | Everyone thinks of changing the world, but no one thinks of changing himself. 
-RECORD 2------------------------------------------------------------------------------
 Seqno | 3                                                                             
 Quote | The purpose of our lives is to be happy.                                      
-RECORD 3------------------------------------------------------------------------------
 Seqno | 4                                                                             
 Quote | Be cool.               

In [None]:
df.show(truncate=25,vertical=True)

-RECORD 0--------------------------
 Seqno | 1                         
 Quote | Be the change you wish... 
-RECORD 1--------------------------
 Seqno | 2                         
 Quote | Everyone thinks of cha... 
-RECORD 2--------------------------
 Seqno | 3                         
 Quote | The purpose of our liv... 
-RECORD 3--------------------------
 Seqno | 4                         
 Quote | Be cool.                  



In [None]:
strutureData=[(("James","","Smith"),"36636","M",3100),
               (("Michael","Rose",""),"40288","F",5000),
               (("Robert","","Williams"),"4225","M",6200),
              (("Maria","Anna","Jacob"),"43432","F",5600),
               (("Jen","Mary","Brown"),"","F",-1)
              ]
structureSchema=StructType([
        StructField('name',StructType([
        StructField('firstname',StringType(),True),
        StructField('middlename',StringType(),True),
        StructField('lastname',StringType(),True)
        ])),
    StructField('id',StringType(),True),
    StructField('gender',StringType(),True),
    StructField('salary',IntegerType(),True)
    ])

df=spark.createDataFrame(data=strutureData,schema=structureSchema)
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
df.show()

+--------------------+-----+------+------+
|                name|   id|gender|salary|
+--------------------+-----+------+------+
|    {James, , Smith}|36636|     M|  3100|
|   {Michael, Rose, }|40288|     F|  5000|
|{Robert, , Williams}| 4225|     M|  6200|
|{Maria, Anna, Jacob}|43432|     F|  5600|
|  {Jen, Mary, Brown}|     |     F|    -1|
+--------------------+-----+------+------+



In [None]:
from pyspark.sql.functions import col,struct,when
updatedDf=df.withColumn("OtherInfo",
                        struct(col("id").alias("identifier"),
                               col("gender").alias("gender"),
                               col("salary").alias("salary"),
                               when(col("salary").cast(IntegerType())<2000,"Low")
                               .when(col("salary").cast(IntegerType())<4000,"Medium")
                               .otherwise("High").alias("Salary_Grade")
                               )).drop("id","gender","salary")
updatedDf.printSchema()
updatedDf.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+--------------------+------------------------+
|name                |OtherInfo               |
+--------------------+------------------------+
|{James, , Smith}    |{36636, M, 3100, Medium}|
|{Michael, Rose, }   |{40288, F, 5000, High}  |
|{Robert, , Williams}|{4225, M, 6200, High}   |
|{Maria, Anna, Jacob}|{43432, F, 5600, High}  |
|{Jen, Mary, Brown}  |{, F, -1, Low}          |
+--------------------+------------------------+



# Spark DataFrames and Datasets

### Differences:
RDDs: Unstructured collections of objects with no schema.

DataFrames: Structured collections of rows and columns with a schema.

Datasets: Type-safe DataFrames with a strongly typed schema.

# RDDs (Resilient Distributed Datasets)
Unstructured: No predefined schema or structure.

Low-level API: Requires manual operations for transformations and actions.

Performance-oriented: Optimized for large-scale data processing.

In [None]:
# Creating an RDD from a Python list
data = [1, 2, 3, 4, 5]
rdd=spark.sparkContext.parallelize(data)
rdd.collect()

[1, 2, 3, 4, 5]

# DataFrames
Structured: Organized as rows and columns with a schema.

Higher-level API: Provides SQL-like operations for data manipulation.

Domain-specific language: Uses domain-specific operators for data analysis.

In [None]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
data=[('James','','Smith','1991-04,01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)]
schema=StructType([\
                         StructField("firstname",StringType(),True), \
                         StructField("middlename",StringType(),True), \
                         StructField("lastname",StringType(),True), \
                         StructField("dob",StringType(),True), \
                         StructField("gender",StringType(),True), \
                         StructField("salary",IntegerType(),True) \
                         ])
df=spark.createDataFrame(data=data,schema=schema)
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04,01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



# Datasets
Type-safe DataFrames: Enforce type safety for column values.

Compiler-optimized: Leverage the compiler for type checking and optimization.

Strongly typed: Provide compile-time type safety for operations.

In [None]:
#  load files
from google.colab import files
uploaded = files.upload()

Saving apple.txt to apple (1).txt


In [None]:
#  read file
df = spark.read.csv("apple.txt", header=True, inferSchema=True)
df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------