<a href="https://colab.research.google.com/github/sharika-anjum/Pyspark-Helpsheet/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Install Pyspark

In [None]:
!pip install pyspark



## Import PySpark and create a Spark session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

*It is necessary to create a Spark session to interact with the Spark framework. The Spark session is the entry point to using the Spark functionality, including creating DataFrames, reading data, and executing SQL queries.*

**Why Create a Spark Session?**

Entry Point for DataFrame and SQL APIs: The Spark session provides the necessary context to create DataFrames and execute SQL queries.
Configuration Management: It allows you to configure various aspects of the Spark job, such as memory usage, parallelism, and other runtime settings.
Unified Interface: It offers a unified interface for different Spark components (Spark SQL, DataFrames, Datasets).

**What Happens if You Don’t Create a Spark Session?**

If you don’t create a Spark session, you won’t be able to leverage the DataFrame and SQL APIs. This means you can’t perform operations like reading data from various sources (e.g., CSV, JSON), running SQL queries, or using the powerful transformations and actions provided by the DataFrame API.

**Use Cases Where a Spark Session Is Not Necessary**

In some very basic scenarios, you might use the older SparkContext directly, especially when dealing with RDDs (Resilient Distributed Datasets). However, even then, it is recommended to use a Spark session because it internally manages the SparkContext and provides a more user-friendly API.

**Conclusion**

Creating a Spark session is a fundamental step in PySpark applications. It initializes the environment for Spark and provides the necessary context to use its APIs effectively. Always start your PySpark code by creating a Spark session to ensure you can fully utilize Spark’s capabilities.

## Create a DataFrame

In [None]:
df = spark.createDataFrame([(1, "Sharika"), (2, "Mukesh")], ["id", "value"])

# Show the DataFrame
df.show()

+---+-------+
| id|  value|
+---+-------+
|  1|Sharika|
|  2| Mukesh|
+---+-------+



##Getting the top 2 records of the dataframe

In [None]:
df.head(2)

[Row(id=1, value='Sharika'), Row(id=2, value='Mukesh')]

In [None]:
type(df)

To know the type of the dataframe it is being used.

Here,df is an instance of the pyspark.sql.dataframe.DataFrame class.

In [None]:
dir(df)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collect_as_arrow',
 '_ipython_key_completions_',
 '_jcols',
 '_jdf',
 '_jmap',
 '_joinAsOf',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_session',
 '_show_string',
 '_sort_cols',
 '_sql_ctx',
 '_support_repr_html',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'dropDuplicatesWithinWate

dir helps to get all the methods and attributes available for the PySpark DataFrame object, allowing you to explore and use them as needed.

In [None]:
dir(spark)

['Builder',
 '__annotations__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_activeSession',
 '_convert_from_pandas',
 '_createFromLocal',
 '_createFromRDD',
 '_create_dataframe',
 '_create_from_pandas_with_arrow',
 '_create_shell_session',
 '_getActiveSessionOrCreate',
 '_get_numpy_record_dtype',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedSession',
 '_jconf',
 '_jsc',
 '_jsparkSession',
 '_jvm',
 '_repr_html_',
 '_sc',
 'active',
 'addArtifact',
 'addArtifacts',
 'addTag',
 'builder',
 'catalog',
 'clearTags',
 'client',
 'conf',
 'copyFromLocalToFs',
 'createDataFrame',
 'getActiveSession',
 'getTags',
 'interruptAll',
 '

Similarly, here this list includes all the methods and attributes available for the spark, allowing you to explore and use them as needed.

##Getting type of columns of yhe dataframe.

In [None]:
df.dtypes

[('id', 'bigint'), ('value', 'string')]

##help function

In [None]:
help(spark.createDataFrame)

Help on method createDataFrame in module pyspark.sql.session:

createDataFrame(data: Union[pyspark.rdd.RDD[Any], Iterable[Any], ForwardRef('PandasDataFrameLike'), ForwardRef('ArrayLike')], schema: Union[pyspark.sql.types.AtomicType, pyspark.sql.types.StructType, str, NoneType] = None, samplingRatio: Optional[float] = None, verifySchema: bool = True) -> pyspark.sql.dataframe.DataFrame method of pyspark.sql.session.SparkSession instance
    Creates a :class:`DataFrame` from an :class:`RDD`, a list, a :class:`pandas.DataFrame`
    or a :class:`numpy.ndarray`.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    data : :class:`RDD` or iterable
        an RDD of any kind of SQL data representation (:class:`Row`,
        :class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`,
        :class:`pandas.DataFrame` or :class:`numpy.ndarray`.
    schema : :class:`pyspark.sql.types.DataType`, str or list, op

Gives a detailed documentation of how create dataframe works, its syntax etc.

##Getting the schema of the dataframe

In [None]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- value: string (nullable = true)



##Reading CSV files

In [None]:
from google.colab import files

# Upload the CSV file
uploaded = files.upload()

Saving EmployeeDetails.csv to EmployeeDetails (1).csv


In [None]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, NoneType] = None, maxCharsPerColumn: Union[str, int, NoneType] = None, maxMalformedLogPerPartition: Union[str, int, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

In [None]:
dfread=spark.read.csv("EmployeeDetails.csv",header=True,inferSchema=True)

In [None]:
dfread.show()

+---+--------------------+------+---+-------------+--------+-------+
| ID|                Name|Gender|Age|   Profession| Company|    CTC|
+---+--------------------+------+---+-------------+--------+-------+
|  1|Sharika Anjum Mondal|     F| 24|Data Engineer|     TCS| 809000|
|  2|        Mukesh Anand|     M| 26|       SDE II|HashedIn|1600000|
+---+--------------------+------+---+-------------+--------+-------+



##Writing in CSV files

In [None]:
from pyspark.sql import dataframe

In [None]:
help(dataframe.DataFrame.write)

Help on property:

    Interface for saving the content of the non-streaming :class:`DataFrame` out into external
    storage.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Returns
    -------
    :class:`DataFrameWriter`
    
    Examples
    --------
    >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
    >>> type(df.write)
    <class '...readwriter.DataFrameWriter'>
    
    Write the DataFrame as a table.
    
    >>> _ = spark.sql("DROP TABLE IF EXISTS tab2")
    >>> df.write.saveAsTable("tab2")
    >>> _ = spark.sql("DROP TABLE tab2")



In [None]:
dfwrite = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
type(dfwrite.write.csv)

method

In [None]:
dfwrite.write.csv("writer",mode="overwrite",header=True) #if .csv is not mentioned by default it will be in parquet format

In [None]:
spark.read.csv("writer",header=True).show()

+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+



In [None]:
dfwrite.write.csv("writer",mode="append",header=True)
spark.read.csv("writer",header=True).show()

+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  2|Alice|
|  5|  Bob|
|  5|  Bob|
+---+-----+

