Yes, you can use the xarray and pyarrow libraries in Python to convert netCDF files to Parquet format. Here's some example code to get you started:



In [2]:
pip uninstall pyarrow

Note: you may need to restart the kernel to use updated packages.




In [1]:
pip install --upgrade pyarrow

Collecting pyarrow
  Using cached pyarrow-11.0.0-cp311-cp311-win_amd64.whl (20.5 MB)
Installing collected packages: pyarrow
Successfully installed pyarrow-11.0.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 23.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import xarray as xr
import pyarrow.parquet as pq

ModuleNotFoundError: No module named 'pyarrow'

In [2]:
# open the netCDF file with xarray
ds = xr.open_dataset('air.sig995.2012.nc')



In [80]:
import os

# # Specify the path to the NetCDF file
file_path = 'air.sig995.2012.nc'

# Use the os.path.getsize() function to get the size of the file in bytes
file_size = os.path.getsize(file_path)

# Convert the file size from bytes to megabytes (optional)
file_size_mb = file_size / (1024 * 1024)

# Print the file size
print(f"The size of the NetCDF file is {file_size} bytes ({file_size_mb:.2f} MB)")


The size of the NetCDF file is 7748542 bytes (7.39 MB)


In [3]:
ds

In [4]:
# convert the xarray dataset to a pandas dataframe
df = ds.to_dataframe().reset_index()

In [5]:
df

Unnamed: 0,lat,lon,time,air
0,90.0,0.0,2012-01-01,234.500000
1,90.0,0.0,2012-01-02,235.339996
2,90.0,0.0,2012-01-03,238.700012
3,90.0,0.0,2012-01-04,240.130005
4,90.0,0.0,2012-01-05,256.470001
...,...,...,...,...
3847387,-90.0,357.5,2012-12-27,252.470001
3847388,-90.0,357.5,2012-12-28,250.420013
3847389,-90.0,357.5,2012-12-29,250.470001
3847390,-90.0,357.5,2012-12-30,249.550018


In [6]:
import pyarrow
print(pyarrow.__version__)

11.0.0


In [7]:
# convert the pandas dataframe to a pyarrow table
import pyarrow as pa
table = pa.Table.from_pandas(df)

In [9]:
# write the pyarrow table to a Parquet file
pa.parquet.write_table(table, 'parquet_file_1.parquet')

You can then read the Parquet file into Spark using the spark.read.parquet function. For example:



In [81]:
import os

# # Specify the path to the NetCDF file
file_path = 'parquet_file_1.parquet'

# Use the os.path.getsize() function to get the size of the file in bytes
file_size = os.path.getsize(file_path)

# Convert the file size from bytes to megabytes (optional)
file_size_mb = file_size / (1024 * 1024)

# Print the file size
print(f"The size of the parquet file is {file_size} bytes ({file_size_mb:.2f} MB)")


The size of the parquet file is 6547568 bytes (6.24 MB)


# initiate Spark

In [10]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('netcdf-to-parquet').getOrCreate()


In [38]:
from pyspark.sql.functions import col

# Read the Parquet file into a DataFrame
df1 = spark.read.parquet('parquet_file_1.parquet')


In [39]:
# Select a subset of variables
subset = df1.select(col('lat'), col('air'))
subset.show()

+----+---------+
| lat|      air|
+----+---------+
|90.0|    234.5|
|90.0|   235.34|
|90.0|238.70001|
|90.0|   240.13|
|90.0|   256.47|
|90.0|260.65002|
|90.0|    256.1|
|90.0|   244.78|
|90.0|   253.43|
|90.0|   244.38|
|90.0|245.73001|
|90.0|   241.18|
|90.0|   244.93|
|90.0|248.23001|
|90.0|   253.53|
|90.0|268.65002|
|90.0|    259.1|
|90.0|   260.43|
|90.0|   256.68|
|90.0|251.55002|
+----+---------+
only showing top 20 rows



In [60]:
# Filter the data based on a condition
filtered = df1.filter(col('lat') > 80)
filtered.show()

+----+---+-------------------+---------+
| lat|lon|               time|      air|
+----+---+-------------------+---------+
|90.0|0.0|2012-01-01 00:00:00|    234.5|
|90.0|0.0|2012-01-02 00:00:00|   235.34|
|90.0|0.0|2012-01-03 00:00:00|238.70001|
|90.0|0.0|2012-01-04 00:00:00|   240.13|
|90.0|0.0|2012-01-05 00:00:00|   256.47|
|90.0|0.0|2012-01-06 00:00:00|260.65002|
|90.0|0.0|2012-01-07 00:00:00|    256.1|
|90.0|0.0|2012-01-08 00:00:00|   244.78|
|90.0|0.0|2012-01-09 00:00:00|   253.43|
|90.0|0.0|2012-01-10 00:00:00|   244.38|
|90.0|0.0|2012-01-11 00:00:00|245.73001|
|90.0|0.0|2012-01-12 00:00:00|   241.18|
|90.0|0.0|2012-01-13 00:00:00|   244.93|
|90.0|0.0|2012-01-14 00:00:00|248.23001|
|90.0|0.0|2012-01-15 00:00:00|   253.53|
|90.0|0.0|2012-01-16 00:00:00|268.65002|
|90.0|0.0|2012-01-17 00:00:00|    259.1|
|90.0|0.0|2012-01-18 00:00:00|   260.43|
|90.0|0.0|2012-01-19 00:00:00|   256.68|
|90.0|0.0|2012-01-20 00:00:00|251.55002|
+----+---+-------------------+---------+
only showing top

In [61]:
# Group the data by a dimension and compute the average of a variable
grouped = df1.groupBy(col('time')).agg({'air': 'avg'})
grouped.show()

+-------------------+------------------+
|               time|          avg(air)|
+-------------------+------------------+
|2012-01-22 00:00:00|276.23148196726993|
|2012-07-11 00:00:00|280.33243449020966|
|2012-10-20 00:00:00|278.54230460656106|
|2012-01-16 00:00:00|277.76814804483587|
|2012-12-17 00:00:00| 276.6795621230359|
|2012-01-19 00:00:00|277.41109470036474|
|2012-05-12 00:00:00|279.18268508272445|
|2012-10-10 00:00:00| 278.5887950159825|
|2012-12-18 00:00:00|276.55451096719077|
|2012-04-23 00:00:00| 277.5920088984288|
|2012-07-08 00:00:00| 280.2102692573582|
|2012-08-24 00:00:00| 280.4929269909677|
|2012-03-22 00:00:00| 276.0524667191179|
|2012-04-20 00:00:00| 277.2611756898134|
|2012-07-23 00:00:00| 280.7047726298758|
|2012-03-30 00:00:00| 275.6068892195889|
|2012-05-05 00:00:00| 278.6593580202425|
|2012-01-26 00:00:00| 275.9756685776616|
|2012-05-07 00:00:00|279.06678966481576|
|2012-07-22 00:00:00|280.75354669591246|
+-------------------+------------------+
only showing top

In [62]:
# Rename variables
renamed = df1.withColumnRenamed('time', 'Time')
renamed

DataFrame[lat: double, lon: double, Time: timestamp_ntz, air: float]

In [66]:
# Add a new variable
# new_col = df1.withColumn('3*air', df.air * 3)
# new_col.show()

In [17]:
# Reshape the data using pivot
# pivoted = df.groupBy(col('time')).pivot('lat').agg({'air': 'avg'})
# pivoted.show()

In [67]:
# Sort the data by a variable
sorted = df1.orderBy(col('air'))
sorted.show()

+-----+-----+-------------------+---------+
|  lat|  lon|               time|      air|
+-----+-----+-------------------+---------+
|-75.0|107.5|2012-06-02 00:00:00|193.80002|
|-75.0|105.0|2012-06-02 00:00:00|   194.13|
|-75.0|117.5|2012-06-03 00:00:00|    194.6|
|-80.0|125.0|2012-06-09 00:00:00|   194.68|
|-75.0|110.0|2012-06-02 00:00:00|194.70001|
|-80.0|122.5|2012-06-09 00:00:00|194.70001|
|-75.0|115.0|2012-06-03 00:00:00|   194.75|
|-77.5|105.0|2012-06-09 00:00:00|    194.9|
|-75.0|120.0|2012-06-03 00:00:00|    195.0|
|-77.5|102.5|2012-06-09 00:00:00|    195.0|
|-80.0|127.5|2012-06-09 00:00:00|    195.0|
|-75.0|115.0|2012-08-19 00:00:00|195.05002|
|-80.0|120.0|2012-06-09 00:00:00|    195.1|
|-75.0|117.5|2012-08-19 00:00:00|   195.13|
|-77.5|107.5|2012-06-09 00:00:00|   195.13|
|-77.5|120.0|2012-06-08 00:00:00|   195.13|
|-75.0|112.5|2012-08-19 00:00:00|195.30002|
|-75.0|112.5|2012-06-03 00:00:00|   195.34|
|-75.0|122.5|2012-06-08 00:00:00|   195.34|
|-77.5|117.5|2012-06-08 00:00:00

In [19]:
# Join two dataframes
# other_df = ...
# joined = df.join(other_df, on=col('latitude'))

In [71]:
# Perform a calculation on the data
# calculation = df1.withColumn('ne_air', df.air * 2 - df.air/2)
# calculation.show()

In [72]:
# Aggregate the data by a dimension
aggregated = df1.groupBy(col('time')).agg({'lat': 'max','air': 'avg'})
aggregated.show()

+-------------------+------------------+--------+
|               time|          avg(air)|max(lat)|
+-------------------+------------------+--------+
|2012-01-22 00:00:00|276.23148196726993|    90.0|
|2012-07-11 00:00:00|280.33243449020966|    90.0|
|2012-10-20 00:00:00|278.54230460656106|    90.0|
|2012-01-16 00:00:00|277.76814804483587|    90.0|
|2012-12-17 00:00:00| 276.6795621230359|    90.0|
|2012-01-19 00:00:00|277.41109470036474|    90.0|
|2012-05-12 00:00:00|279.18268508272445|    90.0|
|2012-10-10 00:00:00| 278.5887950159825|    90.0|
|2012-12-18 00:00:00|276.55451096719077|    90.0|
|2012-04-23 00:00:00| 277.5920088984288|    90.0|
|2012-07-08 00:00:00| 280.2102692573582|    90.0|
|2012-08-24 00:00:00| 280.4929269909677|    90.0|
|2012-03-22 00:00:00| 276.0524667191179|    90.0|
|2012-04-20 00:00:00| 277.2611756898134|    90.0|
|2012-07-23 00:00:00| 280.7047726298758|    90.0|
|2012-03-30 00:00:00| 275.6068892195889|    90.0|
|2012-05-05 00:00:00| 278.6593580202425|    90.0|


These examples demonstrate various data manipulation operations using Spark's DataFrame API, such as selecting subsets of variables, filtering the data, grouping by dimensions and computing aggregates, renaming variables, adding new variables, pivoting the data, sorting by variables, joining dataframes, performing calculations, and aggregating by dimensions.

You can customize these operations to suit your specific needs, and combine them to perform complex data transformations and analysis. The DataFrame API offers a rich set of operations that enable you to work with Parquet files and other types of data efficiently and flexibly.

# filter a Spark dataframe based on time and save the results to a Parquet file

In [73]:
# Filter the dataframe based on time, latitude, and air variable
filtered_df = df1.filter("time > '2012-01-22' AND time < '2012-05-22' AND lat > 30 AND lat < 100 AND air > 255")
filtered_df.show()

+----+---+-------------------+---------+
| lat|lon|               time|      air|
+----+---+-------------------+---------+
|90.0|0.0|2012-02-17 00:00:00|   259.45|
|90.0|0.0|2012-02-20 00:00:00|    255.4|
|90.0|0.0|2012-02-24 00:00:00|262.77002|
|90.0|0.0|2012-02-25 00:00:00|257.65002|
|90.0|0.0|2012-03-09 00:00:00|    263.3|
|90.0|0.0|2012-03-10 00:00:00|   262.45|
|90.0|0.0|2012-03-11 00:00:00|255.73001|
|90.0|0.0|2012-04-03 00:00:00|    260.0|
|90.0|0.0|2012-04-04 00:00:00|   262.22|
|90.0|0.0|2012-04-05 00:00:00|   255.68|
|90.0|0.0|2012-04-07 00:00:00|   256.28|
|90.0|0.0|2012-04-13 00:00:00|    255.5|
|90.0|0.0|2012-04-14 00:00:00|   259.55|
|90.0|0.0|2012-04-15 00:00:00|   261.47|
|90.0|0.0|2012-04-16 00:00:00|    258.3|
|90.0|0.0|2012-04-17 00:00:00|   259.87|
|90.0|0.0|2012-04-18 00:00:00|257.52002|
|90.0|0.0|2012-04-19 00:00:00|258.15002|
|90.0|0.0|2012-04-20 00:00:00|   256.12|
|90.0|0.0|2012-04-28 00:00:00|257.40002|
+----+---+-------------------+---------+
only showing top

In [23]:
# ds = filtered_df.to_xarray()
# ds.to_netcdf('filtered_data.nc')

In [34]:
# Write the filtered dataframe to a new Parquet file
# filtered_df.write.mode('overwrite').parquet("filtered_data.parquet")

In [31]:
# filtered_data = spark.read.parquet('filtered_data.parquet')


In [None]:
# write spark dataframe into netcdf





# install hadoop

In [84]:
# import os
# import shutil
# import urllib.request
# import zipfile

# # Download and extract Hadoop
# url = "https://downloads.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.zip"
# filename = "hadoop.zip"
# urllib.request.urlretrieve(url, filename)

# with zipfile.ZipFile(filename, "r") as zip_ref:
#     zip_ref.extractall("C:/hadoop")

# # Set environment variables
# os.environ["HADOOP_HOME"] = "C:/hadoop/hadoop-3.3.1"
# # os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_281"

# # Copy configuration files
# src = os.path.join(os.getcwd(), "config")
# dst = os.path.join(os.environ["HADOOP_HOME"], "etc", "hadoop")
# shutil.copytree(src, dst)

# # Start Hadoop services
# os.system(os.path.join(os.environ["HADOOP_HOME"], "sbin", "start-dfs.cmd"))
# os.system(os.path.join(os.environ["HADOOP_HOME"], "sbin", "start-yarn.cmd"))


In [82]:
# Write the modified DataFrame back to Parquet format

filtered_df.write.parquet('filtered_df.parquet')


Py4JJavaError: An error occurred while calling o166.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:341)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:331)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:370)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 23 more


In [78]:
# # Write the modified DataFrame back to Parquet format
# # filtered_df.write.format('parquet').save('samir_2_1.parquet')

# import pyarrow as pa
# import pyarrow.parquet as pq

# # Convert the Spark DataFrame to a Pandas DataFrame
# df = filtered_df.toPandas()

# # Create a xarray Dataset from the Pandas DataFrame
# ds = xr.Dataset.from_dataframe(df)
# # Convert the xarray dataset to a PyArrow Table
# table = pa.Table.from_pydict(ds.to_dict())

# # Write the PyArrow Table to a Parquet file
# pq.write_table(table, 'output.parquet')

# # filtered_df.write.parquet('output.parquet')


# convert parquet file back to netcdf

In [40]:
# Write the filtered dataframe to a NetCDF file using xarray
import xarray as xr
import pandas as pd


# Convert the Spark DataFrame to a Pandas DataFrame
df = filtered_df.toPandas()

# Create a xarray Dataset from the Pandas DataFrame
ds = xr.Dataset.from_dataframe(df)

# Write the xarray Dataset to a netCDF file
ds.to_netcdf('netcdf_file.nc')

  series = series.astype(t, copy=False)


In [59]:
# # Load NetCDF file using xarray
# data = xr.open_dataset('netcdf_file.nc')

# var_ = data.variables['time'][:]
# var_.values

Note that this code assumes that your netCDF file contains a single dataset, and that the dataset can fit into memory as a pandas dataframe. If your file is large or contains multiple datasets, you may need to modify the code accordingly.





If you have a large netCDF file that cannot fit into memory as a pandas dataframe, you can use dask to read and process the data in parallel, and then convert it to a PyArrow table and write it to Parquet format.

Here's some example code that uses dask to read a netCDF file, chunk it into smaller pieces, and write each chunk to a separate Parquet file:

In [None]:
import dask.array as da
import dask.bag as db
import dask.dataframe as dd
import xarray as xr
import pyarrow as pa
import pyarrow.parquet as pq

# open the netCDF file with xarray
ds = xr.open_dataset('your_netcdf_file.nc', chunks={'time': 100})

# convert the xarray dataset to a dask array
data = da.from_array(ds.to_array().data)

# define a function to convert a dask array chunk to a PyArrow table and write it to Parquet
def process_chunk(chunk):
    df = dd.from_dask_array(chunk)
    table = pa.Table.from_pandas(df.compute())
    pq.write_table(table, 'chunk.parquet')

# process each chunk in parallel using dask.bag
chunks = db.from_array(data.blocks)
chunks.map(process_chunk).compute()


This code reads the netCDF file in chunks of 100 time steps, converts each chunk to a dask array, and then processes each chunk in parallel using dask.bag. The process_chunk function converts each chunk to a dask dataframe, computes it, converts it to a PyArrow table, and writes it to a separate Parquet file. The compute method is called to trigger the computation of the dask dataframe, which will be distributed across the available cores.

Note that this code assumes that the variable of interest is stored as a single variable in the netCDF file, and that the variable has a time dimension. You may need to modify the code if your file is structured differently. Also, you may need to experiment with the chunk size to find a suitable value for your specific file and system.