<a href="https://colab.research.google.com/github/ryanraba/casa6/blob/master/casa7experiments.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup libraries, tools, data etc


In [3]:
import os
print("installing pre-requisite packages...")
#os.system("pip install pycuda")
os.system("pip install pyarrow")
#os.system("pip install scikit-cuda")
os.system("apt-get install libgfortran3")

print("installing casatasks...")
os.system("pip install --extra-index-url https://casa-pip.nrao.edu:443/repository/pypi-group/simple casatools")
os.system("pip install --extra-index-url https://casa-pip.nrao.edu:443/repository/pypi-group/simple casatasks")

print("downloading MeasurementSet from CASAguide First Look at Imaging")
os.system("wget https://bulk.cv.nrao.edu/almadata/public/working/sis14_twhya_calibrated_flagged.ms.tar")
os.system("tar -xvf sis14_twhya_calibrated_flagged.ms.tar")

print('complete')

installing pre-requisite packages...
installing casatasks...
downloading MeasurementSet from CASAguide First Look at Imaging
complete


In [0]:
# keep this updated if dataset changes
prefix = 'sis14_twhya_calibrated_flagged'

# Convert MS to HDF5 and Apache Parquet formats

In [35]:
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from casatasks import exportuvfits
from astropy.io import fits

print('converting MS to FITS...')
exportuvfits(vis=prefix+'.ms', fitsfile=prefix+'.fits', overwrite=True)

print('processing FITS...')
fhdr = fits.getheader(prefix+'.fits')
fdata = fits.getdata(prefix+'.fits')

# make visibility data frame
# flatten channels to 2-D table
vd = fdata.data[:,0,0,0,:,:,:2].reshape(len(fdata),-1).astype(np.float32)
labels = np.concatenate([['p0r'+str(ii), 'p0i'+str(ii), 'p1r'+str(ii), 'p1i'+str(ii)] for ii in range(vd.shape[1]//4)])
vdf = pd.DataFrame(data=vd, columns=labels)

# put everything else together
md = {}
for par in np.unique(fdata.parnames):
  md[par] = fdata.par(par)

mdf = pd.DataFrame(data=md).merge(vdf, left_index=True, right_index=True)

# save dataframe to hdf5
print('saving FITS to HDF5...')
mdf.to_hdf(prefix+'.h5', key='mdf', mode='w')

# save to parquet
# this time, don't flatten to 2-D, keep visibilities as np.arrays
print('saving FITS to Parquet...')
vd = np.swapaxes(fdata.data[:,0,0,0,:,:,:2].reshape(len(fdata),-1,4).astype(np.float32), 2, 1)
labels = ['p0real', 'p0imag', 'p1real', 'p1imag']

# there must be a better way to do this, but I can't think of it right now
tt = np.empty(len(vd)*4,object).reshape(-1,4)
for ii in range(len(vd)):
  for jj in range(4):
    tt[ii,jj] = vd[ii,jj,:]
vdf = pd.DataFrame(data=tt, columns=labels)
mdf = pd.DataFrame(data=md).merge(vdf, left_index=True, right_index=True)

pq.write_to_dataset(pa.Table.from_pandas(mdf), 
                    root_path=prefix+'.parquet',
                    flavor='spark',
                    partition_cols=['BASELINE'])

print('complete')

converting MS to FITS...
processing FITS...
saving FITS to HDF5...
saving FITS to Parquet...
complete


In [0]:
# cleanup some memory
fhdr, fdata, vd, md, vdf, mdf, tt = [[], [], [], [], [], [], []]

# CPU Processing with Pandas/Numpy

In [37]:
import numpy as np
import scipy.signal as sp
import pandas as pd
import time

# load pandas dataframe
pdf = pd.read_hdf(prefix+'.h5', key='mdf')

# note that visibilities are stored as a flat table with column name pxyz 
# where x = polarization(?), y = r (real) or i (imaginary), and z = channel

r0chans = pdf.columns.values[pdf.columns.str.startswith('p0r')]
i0chans = pdf.columns.values[pdf.columns.str.startswith('p0i')]
r1chans = pdf.columns.values[pdf.columns.str.startswith('p1r')]
i1chans = pdf.columns.values[pdf.columns.str.startswith('p1i')]

# start the race
start = time.time()

################################################################

# I don't really know what we do with the second set of points, so lets just average in to the first
# let's also use complex numbers whenever the functions let us
c0 = pdf[r0chans].values + 1j*pdf[i0chans].values  # first set of complex visibilities
c1 = pdf[r1chans].values + 1j*pdf[i1chans].values  # second set of complex visibilities
vis = (c0 + c1)/2

# lets do some math to take up time
CONT_RMS = np.sqrt(np.mean(np.power(np.abs(vis),2), axis=1))
CONT_MAX = np.max(np.abs(vis), axis=1)
CONT_STD = np.std(np.abs(vis), axis=1)

################################################################
print('pandas elapsed time (s): ', time.time() - start)

print(vis[99,:10])
print(CONT_RMS)
print(CONT_MAX)
print(CONT_STD)


pandas elapsed time (s):  2.1277143955230713
[13.544712  -4.1790752j  11.385413  -1.468117j    6.5120544 -0.63292366j
  4.2294087 -8.978855j   16.402962  -3.087995j    7.351376  -1.7305609j
  4.109383  -3.5484805j  10.181942  -1.7112226j   9.926709  -4.0611053j
  4.697096 -13.006346j  ]
[10.410056 10.482974 10.374875 ... 10.956581 10.379412  8.499947]
[25.624918 21.068535 21.712814 ... 29.41013  27.530628 27.838688]
[4.1418676 4.1568103 3.8755553 ... 5.051679  4.639209  4.07304  ]


#  CPU Processing with Apache Spark
https://spark.apache.org/docs/latest/api/python/index.html

https://spark.apache.org/docs/latest/rdd-programming-guide.html

In [12]:
# install and configure PySpark
import os
print("installing spark...")
os.system("apt-get install openjdk-8-jdk-headless -qq")
os.system("pip install findspark")
os.system("wget https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz")
os.system("tar -xvzf spark-2.4.3-bin-hadoop2.7.tgz")

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
print('complete') 

installing spark...
complete


In [13]:
print('initializing spark...') 
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.memory', '6g')
spark = spark.config('spark.executor.memory', '4g')
spark = spark.config('spark.executor.cores', '4')
spark = spark.master("local[*]").getOrCreate()
spark.sparkContext._conf.getAll() 

initializing spark...


[('spark.driver.host', '47dc05493f45'),
 ('spark.driver.memory', '6g'),
 ('spark.app.id', 'local-1559179842047'),
 ('spark.rdd.compress', 'True'),
 ('spark.executor.memory', '4g'),
 ('spark.driver.port', '45713'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.executor.cores', '4')]

In [38]:
import numpy as np
import time

# load spark dataframe
sdf = spark.read.parquet(prefix+'.parquet')
sdf.cache()
sdf.count() # trigger a load to cache data in memory

# start the race
start = time.time()

################################################################

# here are our map operations to be run on each row independently
def processElement(element):  
  c0 = (np.array(element.p0real) + 1j*np.array(element.p0imag)).astype(np.complex64)
  c1 = (np.array(element.p1real) + 1j*np.array(element.p1imag)).astype(np.complex64)
  vis = (c0 + c1)/2
  
  CONT_RMS = np.sqrt(np.mean(np.power(np.abs(vis),2), axis=0))
  CONT_MAX = np.max(np.abs(vis), axis=0)
  CONT_STD = np.std(np.abs(vis), axis=0)
  
  return vis, CONT_RMS, CONT_MAX, CONT_STD
  
# fire off our map function
prdd = sdf.rdd.map(lambda row: processElement(row)).collect()

################################################################
print('spark elapsed time (s): ', time.time() - start)

print(np.array(prdd)[99][0][:10])
print(np.array(prdd)[:,1])
print(np.array(prdd)[:,2])
print(np.array(prdd)[:,3])

spark elapsed time (s):  33.393213510513306
[15.368347  +3.496803j   2.352149  +6.232252j  -2.8458345 -3.4297054j
 -0.3756826 -4.024106j   2.6655807 -5.9877825j  0.42628008-4.511368j
  3.692929  +4.503709j   7.998578  +6.80622j   11.813255  +4.1906047j
 -1.5166845 +6.9134226j]
[11.02426 10.730138 10.810746 ... 12.037731 11.975934 12.026075]
[24.063694 23.316742 23.461426 ... 29.285095 26.563631 29.69375]
[4.315754 4.5817556 4.179552 ... 5.626123 5.6413765 5.4935527]


# GPU Processing with Tensorflow
https://www.tensorflow.org/api_docs/python/tf

In [42]:
import pandas as pd
import tensorflow as tf
import numpy as np
import time

# load pandas dataframe
pdf = pd.read_hdf(prefix+'.h5', key='mdf')

# note that visibilities are stored as a flat table with column name pxyz 
# where x = polarization(?), y = r (real) or i (imaginary), and z = channel

r0chans = pdf.columns.values[pdf.columns.str.startswith('p0r')]
i0chans = pdf.columns.values[pdf.columns.str.startswith('p0i')]
r1chans = pdf.columns.values[pdf.columns.str.startswith('p1r')]
i1chans = pdf.columns.values[pdf.columns.str.startswith('p1i')]

start = time.time()

###########################################################################

# create our tensor math sequence
with tf.device('/gpu:0'):
  c0 = pdf[r0chans].values + 1j*pdf[i0chans].values  # first set of complex visibilities
  c1 = pdf[r1chans].values + 1j*pdf[i1chans].values  # second set of complex visibilities
  
  aa = tf.placeholder(tf.complex64, shape=c0.shape)
  bb = tf.placeholder(tf.complex64, shape=c1.shape)
  tvis = tf.scalar_mul(0.5, tf.add(aa, bb))
  
  # lets do some math to take up time
  tCONT_RMS = tf.sqrt( tf.reduce_mean( tf.square( tf.abs(tvis) ), axis=1) )          
  tCONT_MAX = tf.reduce_max( tf.abs(tvis), axis=1)
  tCONT_STD = tf.math.reduce_std( tf.abs(tvis), axis=1)

with tf.Session() as sess:    
    vis = sess.run(tvis, feed_dict={aa: c0, bb: c1})
    CONT_RMS = sess.run(tCONT_RMS, feed_dict={aa: c0, bb: c1})
    CONT_MAX = sess.run(tCONT_MAX, feed_dict={aa: c0, bb: c1})
    CONT_STD = sess.run(tCONT_STD, feed_dict={aa: c0, bb: c1})

###########################################################################

print('tensorflow elapsed time (s): ', time.time() - start)

print(vis[99,:10])
print(CONT_RMS)
print(CONT_MAX)
print(CONT_STD)


tensorflow elapsed time (s):  1.1716275215148926
[13.544712  -4.1790752j  11.385413  -1.468117j    6.5120544 -0.63292366j
  4.2294087 -8.978855j   16.402962  -3.087995j    7.351376  -1.7305609j
  4.109383  -3.5484805j  10.181942  -1.7112226j   9.926709  -4.0611053j
  4.697096 -13.006346j  ]
[10.410055 10.482974 10.374873 ... 10.956581 10.379413  8.499947]
[25.624916 21.068535 21.712816 ... 29.41013  27.530628 27.838688]
[4.1418676 4.1568103 3.8755555 ... 5.051679  4.639209  4.07304  ]
