In [1]:
import pandas as pd

from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName("aerofoildData").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/29 15:18:52 WARN Utils: Your hostname, cusat-SBKPF, resolves to a loopback address: 127.0.1.1; using 172.16.64.46 instead (on interface eno1)
25/07/29 15:18:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/29 15:18:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:

df = spark.read.csv("/home/jd/BigDataAnalytics/datasets/airfoil_self_noise.dat", sep="\t", inferSchema=True)

In [7]:
df.show()

+-----+---+------+----+----------+-------+
|  _c0|_c1|   _c2| _c3|       _c4|    _c5|
+-----+---+------+----+----------+-------+
|  800|0.0|0.3048|71.3|0.00266337|126.201|
| 1000|0.0|0.3048|71.3|0.00266337|125.201|
| 1250|0.0|0.3048|71.3|0.00266337|125.951|
| 1600|0.0|0.3048|71.3|0.00266337|127.591|
| 2000|0.0|0.3048|71.3|0.00266337|127.461|
| 2500|0.0|0.3048|71.3|0.00266337|125.571|
| 3150|0.0|0.3048|71.3|0.00266337|125.201|
| 4000|0.0|0.3048|71.3|0.00266337|123.061|
| 5000|0.0|0.3048|71.3|0.00266337|121.301|
| 6300|0.0|0.3048|71.3|0.00266337|119.541|
| 8000|0.0|0.3048|71.3|0.00266337|117.151|
|10000|0.0|0.3048|71.3|0.00266337|115.391|
|12500|0.0|0.3048|71.3|0.00266337|112.241|
|16000|0.0|0.3048|71.3|0.00266337|108.721|
|  500|0.0|0.3048|55.5|0.00283081|126.416|
|  630|0.0|0.3048|55.5|0.00283081|127.696|
|  800|0.0|0.3048|55.5|0.00283081|128.086|
| 1000|0.0|0.3048|55.5|0.00283081|126.966|
| 1250|0.0|0.3048|55.5|0.00283081|126.086|
| 1600|0.0|0.3048|55.5|0.00283081|126.986|
+-----+---+

In [8]:
columns = [
    "Frequency",
    "Angle_of_attack",
    "Chord_length",
    "Free_stream_velocity",
    "Suction_side_displacement_thickness",
    "Sound_pressure_level"
]

df = df.toDF(*columns)
df.printSchema()
df.show(5)


root
 |-- Frequency: integer (nullable = true)
 |-- Angle_of_attack: double (nullable = true)
 |-- Chord_length: double (nullable = true)
 |-- Free_stream_velocity: double (nullable = true)
 |-- Suction_side_displacement_thickness: double (nullable = true)
 |-- Sound_pressure_level: double (nullable = true)

+---------+---------------+------------+--------------------+-----------------------------------+--------------------+
|Frequency|Angle_of_attack|Chord_length|Free_stream_velocity|Suction_side_displacement_thickness|Sound_pressure_level|
+---------+---------------+------------+--------------------+-----------------------------------+--------------------+
|      800|            0.0|      0.3048|                71.3|                         0.00266337|             126.201|
|     1000|            0.0|      0.3048|                71.3|                         0.00266337|             125.201|
|     1250|            0.0|      0.3048|                71.3|                         0.0026633

In [9]:
df.describe().show()

25/07/29 15:25:26 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+------------------+-------------------+--------------------+-----------------------------------+--------------------+
|summary|         Frequency|   Angle_of_attack|       Chord_length|Free_stream_velocity|Suction_side_displacement_thickness|Sound_pressure_level|
+-------+------------------+------------------+-------------------+--------------------+-----------------------------------+--------------------+
|  count|              1503|              1503|               1503|                1503|                               1503|                1503|
|   mean|2886.3805721889553| 6.782302062541517|0.13654823685961226|  50.860745176314175|               0.011139880391217556|  124.83594278110434|
| stddev| 3152.573136930669|5.9181281248864765|0.09354072837396635|  15.572784395385678|               0.013150234266814782|   6.898656621628715|
|    min|               200|               0.0|             0.0254|                31.7|                         4.00682E-4|

In [10]:
for col in columns[:-1]:  # excluding target
    corr = df.stat.corr(col, "Sound_pressure_level")
    print(f"Correlation between {col} and Sound_pressure_level: {corr:.4f}")


Correlation between Frequency and Sound_pressure_level: -0.3907
Correlation between Angle_of_attack and Sound_pressure_level: -0.1561
Correlation between Chord_length and Sound_pressure_level: -0.2362
Correlation between Free_stream_velocity and Sound_pressure_level: 0.1251
Correlation between Suction_side_displacement_thickness and Sound_pressure_level: -0.3127


In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


In [12]:
feature_cols = [
    "Frequency",
    "Angle_of_attack",
    "Chord_length",
    "Free_stream_velocity",
    "Suction_side_displacement_thickness"
]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(df).select("features", "Sound_pressure_level")
