The data source generates files with a dynamic or varying number of columns, often without headers.

**Unstructured Data:** Data that doesn't have a fixed schema, such as logs, machine-generated data, or files with inconsistent columns.

**Variable Columns:** Files where different rows may have different numbers of columns, making it challenging to process using standard CSV readers.

In [0]:
dbutils.fs.put("/scenarios/dynamic_withoutheader.csv", """1,ravi
2,ram,bangalore
3,prasad,chennai,sample@gmail.com,9283923
4,Sam,Pune""")


Wrote 75 bytes.
Out[1]: True

###Initial Attempt: Using csv Method

In [0]:
df = spark.read.csv("/scenarios/dynamic_withoutheader.csv")
display(df)

_c0,_c1
1,ravi
2,ram
3,prasad
4,Sam


If you use csv without headers, PySpark will treat the first row as the definition of the number of columns, which could cause issues if the subsequent rows have more or fewer columns.

###Solution:Reading as Text File

In [0]:
df1 = spark.read.text("/scenarios/dynamic_withoutheader.csv")
display(df1)

value
"1,ravi"
"2,ram,bangalore"
"3,prasad,chennai,sample@gmail.com,9283923"
"4,Sam,Pune"


Instead of using csv, read the file as a text file, which will treat the entire row as a single string. This approach prevents data loss and allows you to process the varying number of columns.

####Splitting the Data

In [0]:
from pyspark.sql.functions import split
df1 = df1.withColumn('splittable_col',split('value',',').alias("splittable_col")).drop('value')
display(df1)

splittable_col
"List(1, ravi)"
"List(2, ram, bangalore)"
"List(3, prasad, chennai, sample@gmail.com, 9283923)"
"List(4, Sam, Pune)"


Used the split function to divide the single string column into an array of values based on a delimiter (comma). This array can then be used to generate the actual columns.

####Determine the Maximum Number of Columns

In [0]:
from pyspark.sql.functions import size,max
max_columns = df1.select(max(size('splittable_col'))).collect()[0][0]
print(max_columns)

5


Applied the size function to determine the number of elements in each row. Then, found the maximum size to understand how many columns needed to be generated.


####Generating Dynamic Columns

In [0]:
for i in range(max_columns):
  df1 = df1.withColumn("col"+str(i), df1['splittable_col'][i])
display(df1)

splittable_col,col0,col1,col2,col3,col4
"List(1, ravi)",1,ravi,,,
"List(2, ram, bangalore)",2,ram,bangalore,,
"List(3, prasad, chennai, sample@gmail.com, 9283923)",3,prasad,chennai,sample@gmail.com,9283923.0
"List(4, Sam, Pune)",4,Sam,Pune,,


####Cleaning Up
**Drop Unnecessary Columns:** After successfully creating the new columns, you dropped the original single-column data to clean up the DataFrame and retain only the dynamically generated columns.

In [0]:
df1 = df1.drop('splittable_col')
display(df1)

col0,col1,col2,col3,col4
1,ravi,,,
2,ram,bangalore,,
3,prasad,chennai,sample@gmail.com,9283923.0
4,Sam,Pune,,


This approach helped you manage unstructured data in PySpark, particularly when dealing with files that had varying numbers of columns, providing a flexible solution for processing such data.